From 839cd3888a79ee6d4d857081bb7d67b4b58e7894 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 24 Jan 2025 15:16:46 +0800 Subject: [PATCH] feat(meta): let stream actors in fragment share same stream node --- proto/meta.proto | 4 +- proto/stream_plan.proto | 2 +- proto/stream_service.proto | 8 +- .../rw_catalog/rw_actor_infos.rs | 2 - .../system_catalog/rw_catalog/rw_fragments.rs | 5 +- .../system_catalog/rw_catalog/rw_sinks.rs | 7 +- src/meta/service/src/stream_service.rs | 2 +- src/meta/src/barrier/checkpoint/control.rs | 9 +- .../barrier/checkpoint/creating_job/status.rs | 12 +- src/meta/src/barrier/command.rs | 36 ++++-- src/meta/src/barrier/info.rs | 10 ++ src/meta/src/barrier/rpc.rs | 98 +++++++++------ src/meta/src/controller/fragment.rs | 115 +++++++++++------- src/meta/src/controller/utils.rs | 1 + src/meta/src/model/stream.rs | 35 +++--- src/meta/src/rpc/ddl_controller.rs | 70 ++++++----- src/meta/src/stream/scale.rs | 31 ++--- src/meta/src/stream/stream_graph/actor.rs | 34 +++--- src/meta/src/stream/stream_graph/fragment.rs | 6 +- src/meta/src/stream/test_fragmenter.rs | 38 +++--- .../src/task/barrier_manager/managed_state.rs | 13 +- src/stream/src/task/stream_manager.rs | 6 +- src/tests/simulation/src/ctl_ext.rs | 2 +- .../integration_tests/scale/shared_source.rs | 13 +- 24 files changed, 334 insertions(+), 225 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 975798de90da9..1fd28d21ed369 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -97,6 +97,8 @@ message TableFragments { // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; + + stream_plan.StreamNode nodes = 9; } // The id of the streaming job. uint32 table_id = 1; @@ -218,7 +220,6 @@ message ListTableFragmentsRequest { message ListTableFragmentsResponse { message ActorInfo { uint32 id = 1; - stream_plan.StreamNode node = 2; repeated stream_plan.Dispatcher dispatcher = 3; } message FragmentInfo { @@ -258,6 +259,7 @@ message ListFragmentDistributionResponse { uint32 fragment_type_mask = 6; uint32 parallelism = 7; uint32 vnode_count = 8; + stream_plan.StreamNode node = 9; } repeated FragmentDistribution distributions = 1; } diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 91197c34fbb8d..e8eafe383d3ba 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -966,7 +966,7 @@ message StreamActor { uint32 actor_id = 1; uint32 fragment_id = 2; - StreamNode nodes = 3; + StreamNode nodes = 3 [deprecated = true]; repeated Dispatcher dispatcher = 4; // The actors that send messages to this actor. // Note that upstream actor ids are also stored in the proto of merge nodes. diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 949e0be84e23e..b88890877752c 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -17,6 +17,12 @@ message InjectBarrierRequest { repeated uint32 table_ids_to_sync = 5; uint32 partial_graph_id = 6; + message FragmentBuildActorInfo { + uint32 fragment_id = 1; + stream_plan.StreamNode node = 2; + repeated BuildActorInfo actors = 3; + } + message BuildActorInfo { message UpstreamActors { repeated uint32 actors = 1; @@ -27,7 +33,7 @@ message InjectBarrierRequest { } repeated common.ActorInfo broadcast_info = 8; - repeated BuildActorInfo actors_to_build = 9; + repeated FragmentBuildActorInfo actors_to_build = 9; repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10; repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11; } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs index a658539ead2b7..7fdc831dde948 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs @@ -25,7 +25,6 @@ struct RwActorInfo { #[primary_key] actor_id: i32, fragment_id: i32, - node: JsonbVal, dispatcher: JsonbVal, } @@ -47,7 +46,6 @@ async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result, parallelism: i32, max_parallelism: i32, + node: JsonbVal, } pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec { @@ -73,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result Result> { "rw_catalog.rw_sink_decouple", "WITH decoupled_sink_internal_table_ids AS ( SELECT - distinct (node->'sink'->'table'->'id')::int as internal_table_id - FROM rw_catalog.rw_actor_infos actor - JOIN - rw_catalog.rw_fragments fragment - ON actor.fragment_id = fragment.fragment_id + (node->'sink'->'table'->'id')::int as internal_table_id + FROM rw_catalog.rw_fragments WHERE 'SINK' = any(flags) AND diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 648d7f12367da..200c68a069d00 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -226,7 +226,6 @@ impl StreamManagerService for StreamServiceImpl { .into_iter() .map(|actor| ActorInfo { id: actor.actor_id, - node: actor.nodes, dispatcher: actor.dispatcher, }) .collect_vec(), @@ -309,6 +308,7 @@ impl StreamManagerService for StreamServiceImpl { fragment_type_mask: fragment_desc.fragment_type_mask as _, parallelism: fragment_desc.parallelism as _, vnode_count: fragment_desc.vnode_count as _, + node: Some(fragment_desc.stream_node.to_protobuf()), }, ) .collect_vec(); diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index b0c7b9b6ee59a..14acd7e695ec7 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -1009,14 +1009,9 @@ impl DatabaseCheckpointControl { "must not set previously" ); } - for stream_actor in info - .stream_job_fragments - .fragments - .values_mut() - .flat_map(|fragment| fragment.actors.iter_mut()) - { + for fragment in info.stream_job_fragments.fragments.values_mut() { fill_snapshot_backfill_epoch( - stream_actor.nodes.as_mut().expect("should exist"), + fragment.nodes.as_mut().expect("should exist"), &snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch, )?; } diff --git a/src/meta/src/barrier/checkpoint/creating_job/status.rs b/src/meta/src/barrier/checkpoint/creating_job/status.rs index 18a74aee570cf..6a3e7d34a94f1 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/status.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/status.rs @@ -18,7 +18,6 @@ use std::mem::take; use risingwave_common::hash::ActorId; use risingwave_common::util::epoch::Epoch; -use risingwave_meta_model::WorkerId; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::barrier_complete_response::{ @@ -28,7 +27,7 @@ use tracing::warn; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch}; -use crate::model::StreamActorWithUpstreams; +use crate::model::StreamJobActorsToCreate; #[derive(Debug)] pub(super) struct CreateMviewLogStoreProgressTracker { @@ -110,7 +109,7 @@ pub(super) enum CreatingStreamingJobStatus { pending_non_checkpoint_barriers: Vec, /// Info of the first barrier: (`actors_to_create`, `mutation`) /// Take the mutation out when injecting the first barrier - initial_barrier_info: Option<(HashMap>, Mutation)>, + initial_barrier_info: Option<(StreamJobActorsToCreate, Mutation)>, }, /// The creating job is consuming log store. /// @@ -126,7 +125,7 @@ pub(super) enum CreatingStreamingJobStatus { pub(super) struct CreatingJobInjectBarrierInfo { pub barrier_info: BarrierInfo, - pub new_actors: Option>>, + pub new_actors: Option, pub mutation: Option, } @@ -252,10 +251,7 @@ impl CreatingStreamingJobStatus { pub(super) fn new_fake_barrier( prev_epoch_fake_physical_time: &mut u64, pending_non_checkpoint_barriers: &mut Vec, - initial_barrier_info: &mut Option<( - HashMap>, - Mutation, - )>, + initial_barrier_info: &mut Option<(StreamJobActorsToCreate, Mutation)>, is_checkpoint: bool, ) -> CreatingJobInjectBarrierInfo { { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 09e5f811a4311..26bacd97eef0d 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -42,7 +42,7 @@ use risingwave_pb::stream_plan::{ use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; -use super::info::{CommandFragmentChanges, InflightStreamingJobInfo}; +use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo}; use crate::barrier::info::BarrierInfo; use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; @@ -50,7 +50,8 @@ use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{StreamingJob, StreamingJobType}; use crate::model::{ - ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments, + ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate, + StreamJobFragments, }; use crate::stream::{ build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, @@ -121,6 +122,8 @@ impl ReplaceStreamJobPlan { let fragment_change = CommandFragmentChanges::NewFragment( self.streaming_job.id().into(), InflightFragmentInfo { + fragment_id: fragment.fragment_id, + nodes: fragment.nodes.clone().unwrap(), actors: fragment .actors .iter() @@ -208,6 +211,8 @@ impl CreateStreamingJobCommandInfo { ( fragment.fragment_id, InflightFragmentInfo { + fragment_id: fragment.fragment_id, + nodes: fragment.nodes.clone().unwrap(), actors: fragment .actors .iter() @@ -955,7 +960,10 @@ impl Command { mutation } - pub fn actors_to_create(&self) -> Option>> { + pub fn actors_to_create( + &self, + graph_info: &InflightDatabaseInfo, + ) -> Option { match self { Command::CreateStreamingJob { info, job_type } => { let mut map = match job_type { @@ -974,13 +982,25 @@ impl Command { Some(map) } Command::RescheduleFragment { reschedules, .. } => { - let mut map: HashMap> = HashMap::new(); - for (actor, status) in reschedules - .values() - .flat_map(|reschedule| reschedule.newly_created_actors.iter()) + let mut map: HashMap)>> = HashMap::new(); + for (fragment_id, actor, status) in + reschedules.iter().flat_map(|(fragment_id, reschedule)| { + reschedule + .newly_created_actors + .iter() + .map(|(actors, status)| (*fragment_id, actors, status)) + }) { let worker_id = status.location.as_ref().unwrap().worker_node_id as _; - map.entry(worker_id).or_default().push(actor.clone()); + map.entry(worker_id) + .or_default() + .entry(fragment_id) + .or_insert_with(|| { + let node = graph_info.fragment(fragment_id).nodes.clone(); + (node, vec![]) + }) + .1 + .push(actor.clone()); } Some(map) } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 488764bc2d77f..0ab59ff48a98e 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -82,6 +82,16 @@ impl InflightDatabaseInfo { pub fn contains_job(&self, job_id: TableId) -> bool { self.jobs.contains_key(&job_id) } + + pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo { + let job_id = self.fragment_location[&fragment_id]; + self.jobs + .get(&job_id) + .expect("should exist") + .fragment_infos + .get(&fragment_id) + .expect("should exist") + } } impl InflightDatabaseInfo { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 94a9ffecc922f..ccca5f5bbf691 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -32,9 +32,13 @@ use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier_mutation::Mutation; -use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo}; +use risingwave_pb::stream_plan::{ + AddMutation, Barrier, BarrierMutation, StreamNode, SubscriptionUpstreamInfo, +}; use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors; -use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo; +use risingwave_pb::stream_service::inject_barrier_request::{ + BuildActorInfo, FragmentBuildActorInfo, +}; use risingwave_pb::stream_service::streaming_control_stream_request::{ CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph, RemovePartialGraphRequest, ResetDatabaseRequest, @@ -57,7 +61,9 @@ use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::MetaSrvEnv; -use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments}; +use crate::model::{ + ActorId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate, StreamJobFragments, +}; use crate::stream::build_actor_connector_splits; use crate::{MetaError, MetaResult}; @@ -371,12 +377,20 @@ impl ControlStreamManager { kind: BarrierKind::Initial, }; - let mut node_actors: HashMap<_, Vec<_>> = HashMap::new(); - for (actor_id, worker_id) in info.fragment_infos().flat_map(|info| info.actors.iter()) { - let worker_id = *worker_id as WorkerId; - let actor_id = *actor_id as ActorId; - let stream_actor = stream_actors.remove(&actor_id).expect("should exist"); - node_actors.entry(worker_id).or_default().push(stream_actor); + let mut node_actors: HashMap<_, HashMap)>> = HashMap::new(); + for fragment_info in info.fragment_infos() { + for (actor_id, worker_id) in &fragment_info.actors { + let worker_id = *worker_id as WorkerId; + let actor_id = *actor_id as ActorId; + let stream_actor = stream_actors.remove(&actor_id).expect("should exist"); + node_actors + .entry(worker_id) + .or_default() + .entry(fragment_info.fragment_id) + .or_insert_with(|| (fragment_info.nodes.clone(), vec![])) + .1 + .push(stream_actor); + } } let background_mviews = info @@ -440,7 +454,7 @@ impl ControlStreamManager { applied_graph_info.fragment_infos(), command .as_ref() - .map(|command| command.actors_to_create()) + .map(|command| command.actors_to_create(pre_applied_graph_info)) .unwrap_or_default(), subscriptions_to_add, subscriptions_to_remove, @@ -455,7 +469,7 @@ impl ControlStreamManager { barrier_info: &BarrierInfo, pre_applied_graph_info: impl IntoIterator, applied_graph_info: impl IntoIterator + 'a, - mut new_actors: Option>>, + mut new_actors: Option, subscriptions_to_add: Vec, subscriptions_to_remove: Vec, ) -> MetaResult> { @@ -483,16 +497,19 @@ impl ControlStreamManager { .iter() .flatten() .flat_map(|(worker_id, actor_infos)| { - actor_infos.iter().map(|actor_info| ActorInfo { - actor_id: actor_info.0.actor_id, - host: self - .nodes - .get(worker_id) - .expect("have checked exist previously") - .worker - .host - .clone(), - }) + actor_infos + .iter() + .flat_map(|(_, (_, actors))| actors.iter()) + .map(|actor_info| ActorInfo { + actor_id: actor_info.0.actor_id, + host: self + .nodes + .get(worker_id) + .expect("have checked exist previously") + .worker + .host + .clone(), + }) }) .collect_vec(); @@ -541,21 +558,30 @@ impl ControlStreamManager { .into_iter() .flatten() .flatten() - .map(|(actor, upstreams)| BuildActorInfo { - actor: Some(actor), - upstreams: upstreams - .into_iter() - .map(|(fragment_id, upstreams)| { - ( - fragment_id, - UpstreamActors { - actors: upstreams - .into_iter() - .collect(), - }, - ) - }) - .collect(), + .map(|(fragment_id, (node, actors))| { + FragmentBuildActorInfo { + fragment_id, + node: Some(node), + actors: actors + .into_iter() + .map(|(actor, upstreams)| BuildActorInfo { + actor: Some(actor), + upstreams: upstreams + .into_iter() + .map(|(fragment_id, upstreams)| { + ( + fragment_id, + UpstreamActors { + actors: upstreams + .into_iter() + .collect(), + }, + ) + }) + .collect(), + }) + .collect(), + } }) .collect(), subscriptions_to_add: subscriptions_to_add.clone(), diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 5e032474a31db..1e33ccbc19f28 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -16,10 +16,12 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use anyhow::Context; +use futures::stream::BoxStream; +use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; -use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; +use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; @@ -45,7 +47,7 @@ use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, + DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamNode, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -71,6 +73,8 @@ use crate::{MetaError, MetaResult}; #[derive(Clone, Debug)] pub struct InflightFragmentInfo { + pub fragment_id: crate::model::FragmentId, + pub nodes: PbStreamNode, pub actors: HashMap, pub state_table_ids: HashSet, } @@ -214,6 +218,7 @@ impl CatalogController { actors: pb_actors, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + nodes, .. } = fragment; @@ -222,8 +227,7 @@ impl CatalogController { assert!(!pb_actors.is_empty()); let stream_node = { - let actor_template = pb_actors.first().cloned().unwrap(); - let mut stream_node = actor_template.nodes.unwrap(); + let mut stream_node = nodes.as_ref().unwrap().clone(); visit_stream_node_mut(&mut stream_node, |body| { #[expect(deprecated)] if let NodeBody::Merge(m) = body { @@ -256,7 +260,6 @@ impl CatalogController { let PbStreamActor { actor_id, fragment_id, - nodes: _, dispatcher: pb_dispatcher, vnode_bitmap: pb_vnode_bitmap, mview_definition: _, @@ -399,7 +402,16 @@ impl CatalogController { vnode_count, } = fragment; - let stream_node_template = stream_node.to_protobuf(); + let stream_node = stream_node.to_protobuf(); + let mut upstream_fragments = HashSet::new(); + visit_stream_node(&stream_node, |body| { + if let NodeBody::Merge(m) = body { + assert!( + upstream_fragments.insert(m.upstream_fragment_id), + "non-duplicate upstream fragment" + ); + } + }); let mut pb_actors = vec![]; @@ -430,26 +442,21 @@ impl CatalogController { let upstream_fragment_actors = upstream_actor_ids.into_inner(); - let pb_nodes = { - let mut nodes = stream_node_template.clone(); - - visit_stream_node_mut(&mut nodes, |body| { - if let NodeBody::Merge(m) = body - && let Some(upstream_actor_ids) = - upstream_fragment_actors.get(&(m.upstream_fragment_id as _)) + { + for upstream_fragment_id in &upstream_fragments { + if let Some(upstream_actor_ids) = + upstream_fragment_actors.get(&(*upstream_fragment_id as _)) { actor_upstreams .entry(actor_id as _) .or_default() .try_insert( - m.upstream_fragment_id, + *upstream_fragment_id, upstream_actor_ids.iter().map(|id| *id as _).collect(), ) .expect("non-duplicate"); } - }); - - Some(nodes) + } }; let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf()); @@ -478,7 +485,7 @@ impl CatalogController { pb_actors.push(PbStreamActor { actor_id: actor_id as _, fragment_id: fragment_id as _, - nodes: pb_nodes, + nodes: None, dispatcher: pb_dispatcher, upstream_actor_id: vec![], vnode_bitmap: pb_vnode_bitmap, @@ -498,6 +505,7 @@ impl CatalogController { state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), + nodes: Some(stream_node), }; Ok(( @@ -945,6 +953,7 @@ impl CatalogController { fragment::Column::StateTableIds, fragment::Column::UpstreamFragmentId, fragment::Column::VnodeCount, + fragment::Column::StreamNode, ]) .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism") .join(JoinType::LeftJoin, fragment::Relation::Actor.def()) @@ -1021,18 +1030,27 @@ impl CatalogController { } else { filter_condition }; - let actor_info: Vec<( - ActorId, - WorkerId, - FragmentId, - I32Array, - DatabaseId, - ObjectId, - )> = Actor::find() + #[expect(clippy::type_complexity)] + let mut actor_info_stream: BoxStream< + '_, + Result< + ( + ActorId, + WorkerId, + FragmentId, + StreamNode, + I32Array, + DatabaseId, + ObjectId, + ), + _, + >, + > = Actor::find() .select_only() .column(actor::Column::ActorId) .column(actor::Column::WorkerId) .column(fragment::Column::FragmentId) + .column(fragment::Column::StreamNode) .column(fragment::Column::StateTableIds) .column(object::Column::DatabaseId) .column(object::Column::Oid) @@ -1040,13 +1058,22 @@ impl CatalogController { .join(JoinType::InnerJoin, fragment::Relation::Object.def()) .filter(filter_condition) .into_tuple() - .all(&inner.db) + .stream(&inner.db) .await?; let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> = HashMap::new(); - for (actor_id, worker_id, fragment_id, state_table_ids, database_id, job_id) in actor_info { + while let Some(( + actor_id, + worker_id, + fragment_id, + node, + state_table_ids, + database_id, + job_id, + )) = actor_info_stream.try_next().await? + { let fragment_infos = database_fragment_infos .entry(database_id) .or_default() @@ -1065,6 +1092,8 @@ impl CatalogController { } Entry::Vacant(entry) => { entry.insert(InflightFragmentInfo { + fragment_id: fragment_id as _, + nodes: node.to_protobuf(), actors: HashMap::from_iter([(actor_id as _, worker_id as _)]), state_table_ids, }); @@ -1720,16 +1749,15 @@ mod tests { ) .to_bitmaps(); + let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap()); + let pb_actors = (0..actor_count) .map(|actor_id| { - let actor_upstream_actor_ids = - upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap(); - let stream_node = generate_merger_stream_node(&actor_upstream_actor_ids); - + #[expect(deprecated)] PbStreamActor { actor_id: actor_id as _, fragment_id: TEST_FRAGMENT_ID as _, - nodes: Some(stream_node), + nodes: None, dispatcher: generate_dispatchers_for_actor(actor_id), vnode_bitmap: actor_bitmaps .get(&actor_id) @@ -1756,6 +1784,7 @@ mod tests { .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), maybe_vnode_count: VnodeCount::for_test().to_protobuf(), + nodes: Some(stream_node.clone()), }; let pb_actor_status = (0..actor_count) @@ -1780,7 +1809,6 @@ mod tests { &pb_actor_splits, )?; - check_fragment_template(fragment.clone(), pb_actors.clone()); check_fragment(fragment, pb_fragment); check_actors( actors, @@ -1788,21 +1816,12 @@ mod tests { actor_dispatchers, pb_actors, Default::default(), + &stream_node, ); Ok(()) } - fn check_fragment_template(fragment: fragment::Model, actors: Vec) { - let stream_node_template = fragment.stream_node.to_protobuf(); - - for PbStreamActor { nodes, .. } in actors { - let template_node = stream_node_template.clone(); - let nodes = nodes.unwrap(); - assert_eq!(nodes, template_node); - } - } - #[tokio::test] async fn test_compose_fragment() -> MetaResult<()> { let actor_count = 3u32; @@ -1917,7 +1936,6 @@ mod tests { let pb_actors = pb_fragment.actors.clone(); - check_fragment_template(fragment.clone(), pb_actors.clone()); check_fragment(fragment, pb_fragment); check_actors( actors, @@ -1925,6 +1943,7 @@ mod tests { actor_dispatchers, pb_actors, pb_actor_splits, + &stream_node, ); Ok(()) @@ -1936,6 +1955,7 @@ mod tests { mut actor_dispatchers: HashMap>, pb_actors: Vec, pb_actor_splits: HashMap, + stream_node: &PbStreamNode, ) { for ( actor::Model { @@ -1951,7 +1971,6 @@ mod tests { PbStreamActor { actor_id: pb_actor_id, fragment_id: pb_fragment_id, - nodes: pb_nodes, dispatcher: pb_dispatcher, vnode_bitmap: pb_vnode_bitmap, mview_definition, @@ -1979,7 +1998,7 @@ mod tests { assert_eq!(mview_definition, ""); - visit_stream_node(pb_nodes.as_ref().unwrap(), |body| { + visit_stream_node(stream_node, |body| { if let PbNodeBody::Merge(m) = body { let expected_upstream_actor_ids = upstream_actor_ids .get(&(m.upstream_fragment_id as _)) @@ -2017,6 +2036,7 @@ mod tests { state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, maybe_vnode_count: _, + nodes, } = pb_fragment; assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32); @@ -2035,5 +2055,6 @@ mod tests { pb_state_table_ids, fragment.state_table_ids.into_u32_array() ); + assert_eq!(fragment.stream_node.to_protobuf(), nodes.unwrap()); } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index ecf229176cb5f..d61de9018e956 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -298,6 +298,7 @@ pub struct FragmentDesc { pub upstream_fragment_id: I32Array, pub parallelism: i64, pub vnode_count: i32, + pub stream_node: StreamNode, } /// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies. diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index bacf07358f3ca..ffbb9fb4f3617 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -201,6 +201,9 @@ impl StreamJobFragments { } } +pub type StreamJobActorsToCreate = + HashMap)>>; + impl StreamJobFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { @@ -323,6 +326,7 @@ impl StreamJobFragments { } /// Returns actors associated with this table. + #[cfg(test)] pub fn actors(&self) -> Vec { self.fragments .values() @@ -422,14 +426,12 @@ impl StreamJobFragments { let mut source_fragments = HashMap::new(); for fragment in self.fragments() { - for actor in &fragment.actors { - if let Some(source_id) = actor.nodes.as_ref().unwrap().find_stream_source() { + { + if let Some(source_id) = fragment.nodes.as_ref().unwrap().find_stream_source() { source_fragments .entry(source_id as SourceId) .or_insert(BTreeSet::new()) .insert(fragment.fragment_id as FragmentId); - - break; } } } @@ -446,16 +448,14 @@ impl StreamJobFragments { let mut source_backfill_fragments = HashMap::new(); for fragment in self.fragments() { - for actor in &fragment.actors { + { if let Some((source_id, upstream_source_fragment_id)) = - actor.nodes.as_ref().unwrap().find_source_backfill() + fragment.nodes.as_ref().unwrap().find_source_backfill() { source_backfill_fragments .entry(source_id as SourceId) .or_insert(BTreeSet::new()) .insert((fragment.fragment_id, upstream_source_fragment_id)); - - break; } } } @@ -467,8 +467,8 @@ impl StreamJobFragments { pub fn union_fragment_for_table(&mut self) -> (&mut Fragment, &mut FragmentActorUpstreams) { let mut union_fragment_id = None; for (fragment_id, fragment) in &self.fragments { - for actor in &fragment.actors { - if let Some(node) = &actor.nodes { + { + if let Some(node) = &fragment.nodes { visit_stream_node(node, |body| { if let NodeBody::Union(_) = body { if let Some(union_fragment_id) = union_fragment_id.as_mut() { @@ -516,8 +516,7 @@ impl StreamJobFragments { pub fn dependent_table_ids(&self) -> HashMap { let mut table_ids = HashMap::new(); self.fragments.values().for_each(|fragment| { - let actor = &fragment.actors[0]; - Self::resolve_dependent_table(actor.nodes.as_ref().unwrap(), &mut table_ids); + Self::resolve_dependent_table(fragment.nodes.as_ref().unwrap(), &mut table_ids); }); table_ids @@ -566,8 +565,8 @@ impl StreamJobFragments { actors } - pub fn actors_to_create(&self) -> HashMap> { - let mut actor_map: HashMap<_, Vec<_>> = HashMap::new(); + pub fn actors_to_create(&self) -> StreamJobActorsToCreate { + let mut actor_map: HashMap<_, HashMap<_, (_, Vec<_>)>> = HashMap::new(); self.fragments .values() .flat_map(|fragment| { @@ -577,10 +576,11 @@ impl StreamJobFragments { actor, actor_upstreams .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)), + fragment, ) }) }) - .for_each(|(actor, actor_upstream)| { + .for_each(|(actor, actor_upstream, fragment)| { let worker_id = self .actor_status .get(&actor.actor_id) @@ -589,6 +589,9 @@ impl StreamJobFragments { actor_map .entry(worker_id) .or_default() + .entry(fragment.fragment_id) + .or_insert_with(|| (fragment.nodes.clone().unwrap(), vec![])) + .1 .push((actor.clone(), actor_upstream.cloned().unwrap_or_default())); }); actor_map @@ -624,7 +627,7 @@ impl StreamJobFragments { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { stream_graph_visitor::visit_stream_node_tables_inner( - &mut fragment.actors[0].nodes.clone().unwrap(), + &mut fragment.nodes.clone().unwrap(), internal_tables_only, true, |table, _| { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 73cecdd38f34a..5541679878520 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -659,9 +659,9 @@ impl DdlController { ) })?; - for actor in &stream_scan_fragment.actors { + { if let Some(NodeBody::StreamCdcScan(ref stream_cdc_scan)) = - actor.nodes.as_ref().unwrap().node_body + stream_scan_fragment.nodes.as_ref().unwrap().node_body && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc { let options_with_secret = WithOptionsSecResolved::new( @@ -755,8 +755,8 @@ impl DdlController { // check if the union fragment is fully assigned. for fragment in stream_job_fragments.fragments.values() { - for actor in &fragment.actors { - if let Some(node) = &actor.nodes { + if let Some(node) = &fragment.nodes { + for actor in &fragment.actors { visit_stream_node(node, |node| { if let NodeBody::Merge(merge_node) = node { let fragment_actor_upstreams = stream_job_fragments @@ -802,16 +802,7 @@ impl DdlController { .map(|actor| actor.actor_id) .collect_vec(); - let mut sink_fields = None; - - for actor in &sink_fragment.actors { - if let Some(node) = &actor.nodes { - sink_fields = Some(node.fields.clone()); - break; - } - } - - let sink_fields = sink_fields.expect("sink fields not found"); + let sink_fields = sink_fragment.nodes.as_ref().unwrap().fields.clone(); let output_indices = sink_fields .iter() @@ -859,8 +850,8 @@ impl DdlController { let upstream_fragment_id = sink_fragment.fragment_id; - for actor in &mut union_fragment.actors { - if let Some(node) = &mut actor.nodes { + if let Some(node) = &mut union_fragment.nodes { + { visit_stream_node_cont_mut(node, |node| { if let Some(NodeBody::Union(_)) = &mut node.node_body { for input_project_node in &mut node.input { @@ -878,14 +869,29 @@ impl DdlController { if let Some(NodeBody::Merge(merge_node)) = &mut merge_stream_node.node_body - && union_fragment_actor_upstreams - .get(&actor.actor_id) - .and_then(|actor_upstream| { - actor_upstream.get(&merge_node.upstream_fragment_id) - }) - .map(|upstream_actor_ids| upstream_actor_ids.is_empty()) - .unwrap_or(true) + && union_fragment.actors.iter().any(|actor| { + union_fragment_actor_upstreams + .get(&actor.actor_id) + .and_then(|actor_upstream| { + actor_upstream.get(&merge_node.upstream_fragment_id) + }) + .map(|upstream_actor_ids| upstream_actor_ids.is_empty()) + .unwrap_or(true) + }) { + if cfg!(debug_assertions) { + union_fragment.actors.iter().for_each(|actor| { + assert!(union_fragment_actor_upstreams + .get(&actor.actor_id) + .and_then(|actor_upstream| { + actor_upstream + .get(&merge_node.upstream_fragment_id) + }) + .map(|upstream_actor_ids| upstream_actor_ids + .is_empty()) + .unwrap_or(true), "inconsistent replace table plan for sink. upstreams: {:?} actor_id: {} upstream_fragment_id: {}", union_fragment_actor_upstreams, actor.actor_id, merge_node.upstream_fragment_id) + }); + } if let Some(sink_id) = sink_id { merge_stream_node.identity = format!("MergeExecutor(from sink {})", sink_id); @@ -904,14 +910,16 @@ impl DdlController { } }; - union_fragment_actor_upstreams - .entry(actor.actor_id) - .or_default() - .try_insert( - upstream_fragment_id, - HashSet::from_iter(sink_actor_ids.iter().cloned()), - ) - .expect("checked non-exist"); + for actor in &union_fragment.actors { + union_fragment_actor_upstreams + .entry(actor.actor_id) + .or_default() + .try_insert( + upstream_fragment_id, + HashSet::from_iter(sink_actor_ids.iter().cloned()), + ) + .expect("checked non-exist"); + } merge_stream_node.fields = sink_fields.to_vec(); diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index c9f82e3ede5af..247dc7ed83df3 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -16,7 +16,6 @@ use std::cmp::{min, Ordering}; use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::hash::{Hash, Hasher}; -use std::iter::repeat; use std::sync::Arc; use std::time::Duration; @@ -42,6 +41,7 @@ use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamActor, + StreamNode, }; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -70,6 +70,7 @@ pub struct CustomFragmentInfo { pub distribution_type: PbFragmentDistributionType, pub state_table_ids: Vec, pub upstream_fragment_ids: Vec, + pub node: StreamNode, pub actor_template: PbStreamActor, pub actors: Vec, } @@ -110,6 +111,7 @@ impl From<&PbFragment> for CustomFragmentInfo { distribution_type: fragment.distribution_type(), state_table_ids: fragment.state_table_ids.clone(), upstream_fragment_ids: fragment.upstream_fragment_ids.clone(), + node: fragment.nodes.clone().unwrap(), actor_template: fragment .actors .first() @@ -594,8 +596,9 @@ impl ScaleController { distribution_type: distribution_type.into(), state_table_ids: state_table_ids.into_u32_array(), upstream_fragment_ids: upstream_fragment_id.into_u32_array(), + node: stream_node.to_protobuf(), actor_template: PbStreamActor { - nodes: Some(stream_node.to_protobuf()), + nodes: None, actor_id, fragment_id: fragment_id as _, dispatcher, @@ -752,11 +755,10 @@ impl ScaleController { } } - if (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 { - let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); - if stream_node.find_stream_source().is_some() { - stream_source_fragment_ids.insert(*fragment_id); - } + if (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 + && fragment.node.find_stream_source().is_some() + { + stream_source_fragment_ids.insert(*fragment_id); } // Check if the reschedule plan is valid. @@ -815,7 +817,7 @@ impl ScaleController { let fragment = fragment_map.get(noshuffle_downstream).unwrap(); // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source. if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 { - let stream_node = fragment.actor_template.nodes.as_ref().unwrap(); + let stream_node = &fragment.node; if let Some((_source_id, upstream_source_fragment_id)) = stream_node.find_source_backfill() { @@ -1183,12 +1185,9 @@ impl ScaleController { assert!(!fragment.actors.is_empty()); - for (actor_to_create, sample_actor) in actors_to_create - .iter() - .zip_eq_debug(repeat(&fragment.actor_template).take(actors_to_create.len())) - { + for actor_to_create in &actors_to_create { let new_actor_id = actor_to_create.0; - let mut new_actor = sample_actor.clone(); + let mut new_actor = fragment.actor_template.clone(); let mut new_actor_upstream = ActorUpstreams::new(); // This should be assigned before the `modify_actor_upstream_and_downstream` call, @@ -1204,6 +1203,7 @@ impl ScaleController { &no_shuffle_upstream_actor_map, &no_shuffle_downstream_actors_map, (&mut new_actor, &mut new_actor_upstream), + &fragment.node, )?; if let Some(bitmap) = fragment_actor_bitmap @@ -1559,6 +1559,7 @@ impl ScaleController { no_shuffle_upstream_actor_map: &HashMap>, no_shuffle_downstream_actors_map: &HashMap>, (new_actor, actor_upstreams): (&mut StreamActor, &mut ActorUpstreams), + stream_node: &StreamNode, ) -> MetaResult<()> { let fragment = &ctx.fragment_map[&new_actor.fragment_id]; let mut applied_upstream_fragment_actor_ids = HashMap::new(); @@ -1610,8 +1611,8 @@ impl ScaleController { } } - if let Some(node) = new_actor.nodes.as_mut() { - visit_stream_node(node, |node_body| { + { + visit_stream_node(stream_node, |node_body| { if let NodeBody::Merge(s) = node_body { let upstream_actor_ids = applied_upstream_fragment_actor_ids .get(&s.upstream_fragment_id) diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index d954e340079a3..48bdb13393cc4 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -302,12 +302,7 @@ impl FragmentActorBuilder { impl ActorBuilder { /// Build an actor after all the upstreams and downstreams are processed. - fn build( - self, - job: &StreamingJob, - expr_context: ExprContext, - nodes: StreamNode, - ) -> MetaResult { + fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult { // Only fill the definition when debug assertions enabled, otherwise use name instead. #[cfg(not(debug_assertions))] let mview_definition = job.name(); @@ -319,7 +314,7 @@ impl ActorBuilder { StreamActor { actor_id: self.actor_id.as_global_id(), fragment_id: self.fragment_id.as_global_id(), - nodes: Some(nodes), + nodes: None, dispatcher: self.downstreams.into_values().collect(), upstream_actor_id: vec![], vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()), @@ -873,7 +868,8 @@ impl ActorGraphBuilder { // Serialize the graph into a map of sealed fragments. let (graph, actor_upstreams) = { - let mut fragment_actors: HashMap> = HashMap::new(); + let mut fragment_actors: HashMap)> = + HashMap::new(); let mut fragment_actor_upstreams: BTreeMap<_, FragmentActorUpstreams> = BTreeMap::new(); // As all fragments are processed, we can now `build` the actors where the `Exchange` @@ -886,10 +882,13 @@ impl ActorGraphBuilder { fragment_actors .try_insert( fragment_id, - builders - .into_values() - .map(|builder| builder.build(job, expr_context.clone(), node.clone())) - .try_collect()?, + ( + node, + builders + .into_values() + .map(|builder| builder.build(job, expr_context.clone())) + .try_collect()?, + ), ) .expect("non-duplicate"); } @@ -897,11 +896,14 @@ impl ActorGraphBuilder { ( fragment_actors .into_iter() - .map(|(fragment_id, actors)| { + .map(|(fragment_id, (stream_node, actors))| { let distribution = self.distributions[&fragment_id].clone(); - let fragment = - self.fragment_graph - .seal_fragment(fragment_id, actors, distribution); + let fragment = self.fragment_graph.seal_fragment( + fragment_id, + actors, + distribution, + stream_node, + ); let fragment_id = fragment_id.as_global_id(); (fragment_id, fragment) }) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 0599e421c6f26..cf99fbaf46cd5 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -894,7 +894,7 @@ impl CompleteStreamFragmentGraph { { // Resolve the required output columns from the upstream materialized view. let (dist_key_indices, output_indices) = { - let nodes = upstream_fragment.actors[0].get_nodes().unwrap(); + let nodes = upstream_fragment.get_nodes().unwrap(); let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); let all_column_ids = mview_node.column_ids(); @@ -938,7 +938,7 @@ impl CompleteStreamFragmentGraph { GlobalFragmentId::new(source_fragment.fragment_id); let output_indices = { - let nodes = upstream_fragment.actors[0].get_nodes().unwrap(); + let nodes = upstream_fragment.get_nodes().unwrap(); let source_node = nodes.get_node_body().unwrap().as_source().unwrap(); @@ -1168,6 +1168,7 @@ impl CompleteStreamFragmentGraph { id: GlobalFragmentId, actors: Vec, distribution: Distribution, + stream_node: StreamNode, ) -> Fragment { let building_fragment = self.get_fragment(id).into_building().unwrap(); let internal_tables = building_fragment.extract_internal_tables(); @@ -1206,6 +1207,7 @@ impl CompleteStreamFragmentGraph { state_table_ids, upstream_fragment_ids, maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), + nodes: Some(stream_node), } } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 3f281491b7d28..b7378c397ee02 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -525,28 +525,32 @@ async fn test_graph_builder() -> MetaResult<()> { .first() .map_or(&vec![], |d| d.get_downstream_actor_id()), ); - let mut node = actor.get_nodes().unwrap(); + } + for fragment in stream_job_fragments.fragments() { + let mut node = fragment.get_nodes().unwrap(); while !node.get_input().is_empty() { node = node.get_input().first().unwrap(); } match node.get_node_body().unwrap() { NodeBody::Merge(merge_node) => { - assert_eq!( - expected_upstream - .get(&actor.get_actor_id()) - .unwrap() - .iter() - .collect::>(), - actor_upstreams - .get(&actor.fragment_id) - .unwrap() - .get(&actor.actor_id) - .unwrap() - .get(&merge_node.upstream_fragment_id) - .unwrap() - .iter() - .collect::>(), - ); + for actor in &fragment.actors { + assert_eq!( + expected_upstream + .get(&actor.get_actor_id()) + .unwrap() + .iter() + .collect::>(), + actor_upstreams + .get(&actor.fragment_id) + .unwrap() + .get(&actor.actor_id) + .unwrap() + .get(&merge_node.upstream_fragment_id) + .unwrap() + .iter() + .collect::>(), + ); + } } NodeBody::Source(_) => { // check nothing. diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index e1c78dcc597e4..b1fa8aef8982e 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -761,7 +761,17 @@ impl DatabaseManagedBarrierState { let mut new_actors = HashSet::new(); let subscriptions = LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone())); - for actor in request.actors_to_build { + for (node, actor) in request + .actors_to_build + .into_iter() + .flat_map(|fragment_actors| { + let node = Arc::new(fragment_actors.node.unwrap()); + fragment_actors + .actors + .into_iter() + .map(move |actor| (node.clone(), actor)) + }) + { let upstream = actor.upstreams; let actor = actor.actor.unwrap(); let actor_id = actor.actor_id; @@ -770,6 +780,7 @@ impl DatabaseManagedBarrierState { assert!(request.actor_ids_to_collect.contains(&actor_id)); let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor( actor, + node, (*subscriptions).clone(), upstream, self.current_shared_context.clone(), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index f79d37e85949c..b9d8f3af48ad2 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -602,6 +602,7 @@ impl StreamActorManager { async fn create_actor( self: Arc, actor: StreamActor, + node: Arc, shared_context: Arc, related_subscriptions: Arc>>, upstreams: HashMap, @@ -626,7 +627,7 @@ impl StreamActorManager { let (executor, subtasks) = self .create_nodes( actor.fragment_id, - actor.get_nodes()?, + &node, self.env.clone(), &actor_context, vnode_bitmap, @@ -660,6 +661,7 @@ impl StreamActorManager { pub(super) fn spawn_actor( self: &Arc, actor: StreamActor, + node: Arc, related_subscriptions: Arc>>, upstreams: HashMap, current_shared_context: Arc, @@ -674,7 +676,7 @@ impl StreamActorManager { format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition); let barrier_manager = local_barrier_manager.clone(); // wrap the future of `create_actor` with `boxed` to avoid stack overflow - let actor = self.clone().create_actor(actor, current_shared_context, related_subscriptions, upstreams, barrier_manager.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { + let actor = self.clone().create_actor(actor, node, current_shared_context, related_subscriptions, upstreams, barrier_manager.clone()).boxed().and_then(|actor| actor.run()).map(move |result| { if let Err(err) = result { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace. diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 644ee97d7b1dc..3f9802db8fdb9 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -47,7 +47,7 @@ pub mod predicate { pub type BoxedPredicate = Box; fn root(fragment: &PbFragment) -> &StreamNode { - fragment.actors.first().unwrap().nodes.as_ref().unwrap() + fragment.nodes.as_ref().unwrap() } fn count(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> usize { diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs index 239a2ef4f068f..a91521cd04637 100644 --- a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs +++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs @@ -47,16 +47,17 @@ fn source_backfill_upstream( } } + let (_, source_fragment_id) = source_backfill_fragment + .get_nodes() + .unwrap() + .find_source_backfill() + .unwrap(); + assert_eq!(source_fragment.fragment_id, source_fragment_id); + source_backfill_fragment .actors .iter() .map(|backfill_actor| { - let (_, source_fragment_id) = backfill_actor - .get_nodes() - .unwrap() - .find_source_backfill() - .unwrap(); - assert_eq!(source_fragment.fragment_id, source_fragment_id); ( backfill_actor.actor_id, no_shuffle_downstream_to_upstream[&backfill_actor.actor_id],