From 4b20ca11a285cabb587168ed5c009ab24bf87ff0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 31 Jan 2025 02:31:46 +0800 Subject: [PATCH] refactor(meta): deprecate persisted actor upstreams and get upstreams from dispatcher info --- src/meta/model/src/actor.rs | 1 + .../barrier/checkpoint/creating_job/mod.rs | 37 +++- src/meta/src/barrier/command.rs | 174 +++++++++++++-- src/meta/src/barrier/context/recovery.rs | 5 +- src/meta/src/barrier/mod.rs | 9 +- src/meta/src/barrier/rpc.rs | 32 ++- src/meta/src/controller/catalog/mod.rs | 10 +- src/meta/src/controller/catalog/util.rs | 38 ---- src/meta/src/controller/fragment.rs | 198 ++++++------------ src/meta/src/controller/scale.rs | 43 +--- src/meta/src/controller/streaming_job.rs | 113 ++-------- src/meta/src/manager/metadata.rs | 20 +- src/meta/src/model/stream.rs | 75 +++---- src/meta/src/rpc/ddl_controller.rs | 86 +++----- src/meta/src/stream/scale.rs | 86 +------- .../stream/source_manager/split_assignment.rs | 2 +- 16 files changed, 394 insertions(+), 535 deletions(-) diff --git a/src/meta/model/src/actor.rs b/src/meta/model/src/actor.rs index eda48d47c5049..2164ea43c4031 100644 --- a/src/meta/model/src/actor.rs +++ b/src/meta/model/src/actor.rs @@ -57,6 +57,7 @@ pub struct Model { pub status: ActorStatus, pub splits: Option, pub worker_id: WorkerId, + #[deprecated] pub upstream_actor_ids: ActorUpstreamActors, pub vnode_bitmap: Option, pub expr_context: ExprContext, diff --git a/src/meta/src/barrier/checkpoint/creating_job/mod.rs b/src/meta/src/barrier/checkpoint/creating_job/mod.rs index 514691e4097f6..0faad58357f89 100644 --- a/src/meta/src/barrier/checkpoint/creating_job/mod.rs +++ b/src/meta/src/barrier/checkpoint/creating_job/mod.rs @@ -35,6 +35,7 @@ use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; use crate::controller::fragment::InflightFragmentInfo; +use crate::model::StreamJobActorsToCreate; use crate::rpc::metrics::GLOBAL_META_METRICS; use crate::MetaResult; @@ -73,7 +74,41 @@ impl CreatingStreamingJobControl { let table_id = info.stream_job_fragments.stream_job_id(); let table_id_str = format!("{}", table_id.table_id); - let actors_to_create = info.stream_job_fragments.actors_to_create(); + let mut actor_upstreams = Command::collect_actor_upstreams( + info.stream_job_fragments + .actors_to_create() + .flat_map(|(fragment_id, _, actors)| { + actors.map(move |(actor, _)| { + (actor.actor_id, fragment_id, actor.dispatcher.as_slice()) + }) + }) + .chain( + info.dispatchers + .iter() + .flat_map(|(fragment_id, dispatchers)| { + dispatchers.iter().map(|(actor_id, dispatchers)| { + (*actor_id, *fragment_id, dispatchers.as_slice()) + }) + }), + ), + None, + ); + let mut actors_to_create = StreamJobActorsToCreate::default(); + for (fragment_id, node, actors) in info.stream_job_fragments.actors_to_create() { + for (actor, worker_id) in actors { + actors_to_create + .entry(worker_id) + .or_default() + .entry(fragment_id) + .or_insert_with(|| (node.clone(), vec![])) + .1 + .push(( + actor.clone(), + actor_upstreams.remove(&actor.actor_id).unwrap_or_default(), + )) + } + } + let graph_info = InflightStreamingJobInfo { job_id: table_id, fragment_infos, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0dda385e005d0..29e1b86107191 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::update_mutation::*; use risingwave_pb::stream_plan::{ AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation, - StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, + StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation, }; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::warn; @@ -50,8 +50,7 @@ use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; use crate::manager::{StreamingJob, StreamingJobType}; use crate::model::{ - ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate, - StreamJobFragments, + ActorId, ActorUpstreams, DispatcherId, FragmentId, StreamJobActorsToCreate, StreamJobFragments, }; use crate::stream::{ build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig, @@ -86,7 +85,7 @@ pub struct Reschedule { /// `Source` and `SourceBackfill` are handled together here. pub actor_splits: HashMap>, - pub newly_created_actors: Vec<(StreamActorWithUpstreams, PbActorStatus)>, + pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } /// Replacing an old job with a new one. All actors in the job will be rebuilt. @@ -983,22 +982,82 @@ impl Command { ) -> Option { match self { Command::CreateStreamingJob { info, job_type } => { - let mut map = match job_type { - CreateStreamingJobType::Normal => HashMap::new(), - CreateStreamingJobType::SinkIntoTable(replace_table) => { - replace_table.new_fragments.actors_to_create() - } + let sink_into_table_replace_plan = match job_type { + CreateStreamingJobType::Normal => None, + CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table), CreateStreamingJobType::SnapshotBackfill(_) => { // for snapshot backfill, the actors to create is measured separately return None; } }; - for (worker_id, new_actors) in info.stream_job_fragments.actors_to_create() { - map.entry(worker_id).or_default().extend(new_actors) + let get_actors_to_create = || { + sink_into_table_replace_plan + .map(|plan| plan.new_fragments.actors_to_create()) + .into_iter() + .flatten() + .chain(info.stream_job_fragments.actors_to_create()) + }; + let mut actor_upstreams = Self::collect_actor_upstreams( + get_actors_to_create() + .flat_map(|(fragment_id, _, actors)| { + actors.map(move |(actor, _)| { + (actor.actor_id, fragment_id, actor.dispatcher.as_slice()) + }) + }) + .chain( + sink_into_table_replace_plan + .map(|plan| { + plan.dispatchers.iter().flat_map( + |(fragment_id, dispatchers)| { + dispatchers.iter().map(|(actor_id, dispatchers)| { + (*actor_id, *fragment_id, dispatchers.as_slice()) + }) + }, + ) + }) + .into_iter() + .flatten(), + ) + .chain( + info.dispatchers + .iter() + .flat_map(|(fragment_id, dispatchers)| { + dispatchers.iter().map(|(actor_id, dispatchers)| { + (*actor_id, *fragment_id, dispatchers.as_slice()) + }) + }), + ), + None, + ); + let mut map = StreamJobActorsToCreate::default(); + for (fragment_id, node, actors) in get_actors_to_create() { + for (actor, worker_id) in actors { + map.entry(worker_id) + .or_default() + .entry(fragment_id) + .or_insert_with(|| (node.clone(), vec![])) + .1 + .push(( + actor.clone(), + actor_upstreams.remove(&actor.actor_id).unwrap_or_default(), + )) + } } Some(map) } - Command::RescheduleFragment { reschedules, .. } => { + Command::RescheduleFragment { + reschedules, + fragment_actors, + .. + } => { + let mut actor_upstreams = Self::collect_actor_upstreams( + reschedules.iter().flat_map(|(fragment_id, reschedule)| { + reschedule.newly_created_actors.iter().map(|(actor, _)| { + (actor.actor_id, *fragment_id, actor.dispatcher.as_slice()) + }) + }), + Some((reschedules, fragment_actors)), + ); let mut map: HashMap)>> = HashMap::new(); for (fragment_id, actor, status) in reschedules.iter().flat_map(|(fragment_id, reschedule)| { @@ -1009,6 +1068,7 @@ impl Command { }) { let worker_id = status.location.as_ref().unwrap().worker_node_id as _; + let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default(); map.entry(worker_id) .or_default() .entry(fragment_id) @@ -1017,12 +1077,44 @@ impl Command { (node, vec![]) }) .1 - .push(actor.clone()); + .push((actor.clone(), upstreams)); } Some(map) } Command::ReplaceStreamJob(replace_table) => { - Some(replace_table.new_fragments.actors_to_create()) + let mut actor_upstreams = Self::collect_actor_upstreams( + replace_table + .new_fragments + .actors_to_create() + .flat_map(|(fragment_id, _, actors)| { + actors.map(move |(actor, _)| { + (actor.actor_id, fragment_id, actor.dispatcher.as_slice()) + }) + }) + .chain(replace_table.dispatchers.iter().flat_map( + |(fragment_id, dispatchers)| { + dispatchers.iter().map(|(actor_id, dispatchers)| { + (*actor_id, *fragment_id, dispatchers.as_slice()) + }) + }, + )), + None, + ); + let mut map = StreamJobActorsToCreate::default(); + for (fragment_id, node, actors) in replace_table.new_fragments.actors_to_create() { + for (actor, worker_id) in actors { + map.entry(worker_id) + .or_default() + .entry(fragment_id) + .or_insert_with(|| (node.clone(), vec![])) + .1 + .push(( + actor.clone(), + actor_upstreams.remove(&actor.actor_id).unwrap_or_default(), + )) + } + } + Some(map) } _ => None, } @@ -1104,3 +1196,57 @@ impl Command { .flatten() } } + +impl Command { + #[expect(clippy::type_complexity)] + pub fn collect_actor_upstreams( + actor_dispatchers: impl Iterator, + reschedule_dispatcher_update: Option<( + &HashMap, + &HashMap>, + )>, + ) -> HashMap { + let mut actor_upstreams: HashMap = HashMap::new(); + for (upstream_actor_id, upstream_fragment_id, dispatchers) in actor_dispatchers { + for downstream_actor_id in dispatchers + .iter() + .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter()) + { + actor_upstreams + .entry(*downstream_actor_id) + .or_default() + .entry(upstream_fragment_id) + .or_default() + .insert(upstream_actor_id); + } + } + if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update { + for reschedule in reschedules.values() { + for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids { + let upstream_reschedule = reschedules.get(upstream_fragment_id); + for upstream_actor_id in fragment_actors + .get(upstream_fragment_id) + .expect("should exist") + { + if let Some(upstream_reschedule) = upstream_reschedule + && upstream_reschedule + .removed_actors + .contains(upstream_actor_id) + { + continue; + } + for downstream_actor_id in reschedule.added_actors.values().flatten() { + actor_upstreams + .entry(*downstream_actor_id) + .or_default() + .entry(*upstream_fragment_id) + .or_default() + .insert(*upstream_actor_id); + } + } + } + } + } + actor_upstreams + } +} diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index c999e361bfdb7..833ec30563b29 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::{DatabaseId, TableId}; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_meta_model::StreamingParallelism; +use risingwave_pb::stream_plan::StreamActor; use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::{debug, info, warn}; @@ -33,7 +34,7 @@ use crate::barrier::info::InflightDatabaseInfo; use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo}; use crate::controller::fragment::InflightFragmentInfo; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments, TableParallelism}; +use crate::model::{ActorId, StreamJobFragments, TableParallelism}; use crate::stream::{ JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget, RescheduleOptions, SourceChange, @@ -770,7 +771,7 @@ impl GlobalBarrierWorkerContextImpl { } /// Update all actors in compute nodes. - async fn load_all_actors(&self) -> MetaResult> { + async fn load_all_actors(&self) -> MetaResult> { self.metadata_manager.all_active_actors().await } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index da024591c229b..ac05a5fa9c12e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -20,12 +20,13 @@ use risingwave_connector::source::SplitImpl; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PbRecoveryStatus; +use risingwave_pb::stream_plan::StreamActor; use tokio::sync::oneshot::Sender; use self::notifier::Notifier; use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo}; use crate::manager::ActiveStreamingWorkerNodes; -use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments}; +use crate::model::{ActorId, StreamJobFragments}; use crate::{MetaError, MetaResult}; mod checkpoint; @@ -103,7 +104,7 @@ struct BarrierWorkerRuntimeInfoSnapshot { database_fragment_infos: HashMap, state_table_committed_epochs: HashMap, subscription_infos: HashMap, - stream_actors: HashMap, + stream_actors: HashMap, source_splits: HashMap>, background_jobs: HashMap, hummock_version_stats: HummockVersionStats, @@ -114,7 +115,7 @@ impl BarrierWorkerRuntimeInfoSnapshot { database_id: DatabaseId, database_info: &InflightDatabaseInfo, active_streaming_nodes: &ActiveStreamingWorkerNodes, - stream_actors: &HashMap, + stream_actors: &HashMap, state_table_committed_epochs: &HashMap, ) -> MetaResult<()> { { @@ -189,7 +190,7 @@ struct DatabaseRuntimeInfoSnapshot { database_fragment_info: InflightDatabaseInfo, state_table_committed_epochs: HashMap, subscription_info: InflightSubscriptionInfo, - stream_actors: HashMap, + stream_actors: HashMap, source_splits: HashMap>, background_jobs: HashMap, } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index edab57f53659b..003a6596fb691 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -33,7 +33,7 @@ use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{ - AddMutation, Barrier, BarrierMutation, StreamNode, SubscriptionUpstreamInfo, + AddMutation, Barrier, BarrierMutation, StreamActor, StreamNode, SubscriptionUpstreamInfo, }; use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors; use risingwave_pb::stream_service::inject_barrier_request::{ @@ -326,7 +326,7 @@ impl ControlStreamManager { database_id: DatabaseId, info: InflightDatabaseInfo, state_table_committed_epochs: &mut HashMap, - stream_actors: &mut HashMap, + stream_actors: &mut HashMap, source_splits: &mut HashMap>, background_jobs: &mut HashMap, subscription_info: InflightSubscriptionInfo, @@ -377,19 +377,43 @@ impl ControlStreamManager { kind: BarrierKind::Initial, }; - let mut node_actors: HashMap<_, HashMap)>> = HashMap::new(); + let mut stream_actors: HashMap<_, _> = info + .fragment_infos() + .flat_map(|fragment_info| fragment_info.actors.keys()) + .map(|actor_id| { + let stream_actor = stream_actors.remove(actor_id).expect("should exist"); + (stream_actor.actor_id, stream_actor) + }) + .collect(); + + let mut actor_upstreams = Command::collect_actor_upstreams( + stream_actors.values().map(|actor| { + ( + actor.actor_id, + actor.fragment_id, + actor.dispatcher.as_slice(), + ) + }), + None, + ); + + let mut node_actors: HashMap< + _, + HashMap)>, + > = HashMap::new(); for fragment_info in info.fragment_infos() { for (actor_id, worker_id) in &fragment_info.actors { let worker_id = *worker_id as WorkerId; let actor_id = *actor_id as ActorId; let stream_actor = stream_actors.remove(&actor_id).expect("should exist"); + let upstream = actor_upstreams.remove(&actor_id).unwrap_or_default(); node_actors .entry(worker_id) .or_default() .entry(fragment_info.fragment_id) .or_insert_with(|| (fragment_info.nodes.clone(), vec![])) .1 - .push(stream_actor); + .push((stream_actor, upstream)); } } diff --git a/src/meta/src/controller/catalog/mod.rs b/src/meta/src/controller/catalog/mod.rs index b7f12c4f3e94e..78da84a1393c7 100644 --- a/src/meta/src/controller/catalog/mod.rs +++ b/src/meta/src/controller/catalog/mod.rs @@ -37,11 +37,11 @@ use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::*; use risingwave_meta_model::table::TableType; use risingwave_meta_model::{ - actor, connection, database, fragment, function, index, object, object_dependency, schema, - secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, - ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, - I32Array, IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkId, SourceId, - StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId, + connection, database, fragment, function, index, object, object_dependency, schema, secret, + sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, + ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, I32Array, IndexId, + JobStatus, ObjectId, Property, SchemaId, SecretId, SinkId, SourceId, StreamNode, + StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId, }; use risingwave_pb::catalog::connection::Info as ConnectionInfo; use risingwave_pb::catalog::subscription::SubscriptionState; diff --git a/src/meta/src/controller/catalog/util.rs b/src/meta/src/controller/catalog/util.rs index 9d8c77c21274e..0954289beb6f2 100644 --- a/src/meta/src/controller/catalog/util.rs +++ b/src/meta/src/controller/catalog/util.rs @@ -452,44 +452,6 @@ impl CatalogController { .filter(fragment::Column::FragmentId.eq(fragment_id)) .exec(txn) .await?; - - let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find() - .select_only() - .columns(vec![ - actor::Column::ActorId, - actor::Column::UpstreamActorIds, - ]) - .filter(actor::Column::FragmentId.eq(fragment_id)) - .into_tuple() - .all(txn) - .await?; - - for (actor_id, upstream_actor_ids) in actors { - let mut upstream_actor_ids = upstream_actor_ids.into_inner(); - - let dirty_actor_upstreams = upstream_actor_ids - .extract_if(|id, _| !all_fragment_ids.contains(id)) - .map(|(id, _)| id) - .collect_vec(); - - if !dirty_actor_upstreams.is_empty() { - tracing::debug!( - "cleaning dirty table sink fragment {:?} from downstream fragment {} actor {}", - dirty_actor_upstreams, - fragment_id, - actor_id, - ); - - Actor::update_many() - .col_expr( - actor::Column::UpstreamActorIds, - ActorUpstreamActors::from(upstream_actor_ids).into(), - ) - .filter(actor::Column::ActorId.eq(actor_id)) - .exec(txn) - .await?; - } - } } } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 643bfe818ac0b..6f692efbe36dc 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -29,9 +29,8 @@ use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table, - ActorId, ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, - JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, - VnodeBitmap, WorkerId, + ActorId, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, + SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; @@ -48,7 +47,7 @@ use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamNode, - PbStreamScanType, StreamScanType, + PbStreamScanType, StreamActor, StreamScanType, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -65,10 +64,7 @@ use crate::controller::utils::{ FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; use crate::manager::LocalNotification; -use crate::model::{ - ActorUpstreams, FragmentActorUpstreams, StreamActorWithUpstreams, StreamContext, - StreamJobFragments, TableParallelism, -}; +use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_split_impls, SplitAssignment}; use crate::{model, MetaError, MetaResult}; @@ -186,9 +182,6 @@ impl CatalogController { let (fragment, actors, dispatchers) = Self::extract_fragment_and_actors( stream_job_fragments.stream_job_id.table_id as _, fragment, - stream_job_fragments - .actor_upstreams - .get(&fragment.fragment_id), &stream_job_fragments.actor_status, &stream_job_fragments.actor_splits, )?; @@ -203,7 +196,6 @@ impl CatalogController { pub fn extract_fragment_and_actors( job_id: ObjectId, fragment: &PbFragment, - fragment_actor_upstreams: Option<&FragmentActorUpstreams>, actor_status: &BTreeMap, actor_splits: &HashMap>, ) -> MetaResult<( @@ -243,21 +235,6 @@ impl CatalogController { let mut actor_dispatchers = HashMap::new(); for actor in pb_actors { - let mut upstream_actors = BTreeMap::new(); - - if let Some(actor_upstreams) = fragment_actor_upstreams - .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)) - { - for (upstream_fragment_id, actor_upstream_actors) in actor_upstreams { - upstream_actors - .try_insert( - *upstream_fragment_id, - actor_upstream_actors.iter().cloned().collect_vec(), - ) - .expect("There should only be one link between two fragments"); - } - } - let PbStreamActor { actor_id, fragment_id, @@ -287,13 +264,14 @@ impl CatalogController { .as_ref() .expect("no expression context found"); + #[expect(deprecated)] actors.push(actor::Model { actor_id: *actor_id as _, fragment_id: *fragment_id as _, status: status.get_state().unwrap().into(), splits, worker_id, - upstream_actor_ids: upstream_actors.into(), + upstream_actor_ids: Default::default(), vnode_bitmap: pb_vnode_bitmap.as_ref().map(VnodeBitmap::from), expr_context: ExprContext::from(pb_expr_context), }); @@ -344,15 +322,11 @@ impl CatalogController { let mut pb_fragments = BTreeMap::new(); let mut pb_actor_splits = HashMap::new(); let mut pb_actor_status = BTreeMap::new(); - let mut fragment_actor_upstreams = BTreeMap::new(); for (fragment, actors, actor_dispatcher) in fragments { - let (fragment, actor_upstreams, fragment_actor_status, fragment_actor_splits) = + let (fragment, fragment_actor_status, fragment_actor_splits) = Self::compose_fragment(fragment, actors, actor_dispatcher)?; - fragment_actor_upstreams - .try_insert(fragment.fragment_id, actor_upstreams) - .expect("non-duplicate"); pb_fragments.insert(fragment.fragment_id, fragment); pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits)); @@ -363,7 +337,6 @@ impl CatalogController { stream_job_id: table_id.into(), state: state as _, fragments: pb_fragments, - actor_upstreams: fragment_actor_upstreams, actor_status: pb_actor_status, actor_splits: pb_actor_splits, ctx: ctx @@ -388,7 +361,6 @@ impl CatalogController { mut actor_dispatcher: HashMap>, ) -> MetaResult<( PbFragment, - FragmentActorUpstreams, HashMap, HashMap, )> { @@ -416,7 +388,6 @@ impl CatalogController { let mut pb_actors = vec![]; - let mut actor_upstreams: HashMap<_, ActorUpstreams> = HashMap::new(); let mut pb_actor_status = HashMap::new(); let mut pb_actor_splits = HashMap::new(); @@ -436,30 +407,11 @@ impl CatalogController { status, worker_id, splits, - upstream_actor_ids, vnode_bitmap, expr_context, + .. } = actor; - let upstream_fragment_actors = upstream_actor_ids.into_inner(); - - { - for upstream_fragment_id in &upstream_fragments { - if let Some(upstream_actor_ids) = - upstream_fragment_actors.get(&(*upstream_fragment_id as _)) - { - actor_upstreams - .entry(actor_id as _) - .or_default() - .try_insert( - *upstream_fragment_id, - upstream_actor_ids.iter().map(|id| *id as _).collect(), - ) - .expect("non-duplicate"); - } - } - }; - let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf()); let pb_expr_context = Some(expr_context.to_protobuf()); @@ -509,12 +461,7 @@ impl CatalogController { nodes: Some(stream_node), }; - Ok(( - pb_fragment, - actor_upstreams, - pb_actor_status, - pb_actor_splits, - )) + Ok((pb_fragment, pb_actor_status, pb_actor_splits)) } pub async fn running_fragment_parallelisms( @@ -1279,7 +1226,7 @@ impl CatalogController { pub async fn all_node_actors( &self, include_inactive: bool, - ) -> MetaResult>> { + ) -> MetaResult>> { let inner = self.inner.read().await; let fragment_actors = if include_inactive { Fragment::find() @@ -1312,15 +1259,14 @@ impl CatalogController { } } - let (table_fragments, mut actor_upstreams, actor_status, _) = + let (table_fragments, actor_status, _) = Self::compose_fragment(fragment, actors, dispatcher_info)?; for actor in table_fragments.actors { let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId; - node_actors.entry(node_id).or_insert_with(Vec::new).push({ - let actor_upstreams = - actor_upstreams.remove(&actor.actor_id).unwrap_or_default(); - (actor, actor_upstreams) - }); + node_actors + .entry(node_id) + .or_insert_with(Vec::new) + .push(actor); } } @@ -1429,40 +1375,52 @@ impl CatalogController { /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status. pub async fn get_running_actors_for_source_backfill( &self, - fragment_id: FragmentId, + source_backfill_fragment_id: FragmentId, + source_fragment_id: FragmentId, ) -> MetaResult> { let inner = self.inner.read().await; let txn = inner.db.begin().await?; - let fragment = Fragment::find_by_id(fragment_id) - .one(&txn) - .await? - .context(format!("fragment {} not found", fragment_id))?; - let (_source_id, upstream_source_fragment_id) = fragment - .stream_node - .to_protobuf() - .find_source_backfill() - .unwrap(); - let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find() + + let source_backfill_actors: Vec = Actor::find() .select_only() .column(actor::Column::ActorId) - .column(actor::Column::UpstreamActorIds) - .filter(actor::Column::FragmentId.eq(fragment_id)) + .filter(actor::Column::FragmentId.eq(source_backfill_fragment_id)) .filter(actor::Column::Status.eq(ActorStatus::Running)) .into_tuple() .all(&txn) .await?; - Ok(actors + + let source_actor_downstreams: Vec<(ActorId, I32Array)> = Actor::find() + .select_only() + .column(actor::Column::ActorId) + .column(actor_dispatcher::Column::DownstreamActorIds) + .filter(actor::Column::FragmentId.eq(source_fragment_id)) + .join(JoinType::InnerJoin, actor::Relation::ActorDispatcher.def()) + .into_tuple() + .all(&txn) + .await?; + + let mut source_backfill_actor_upstreams: HashMap> = HashMap::new(); + for (source_actor_id, downstream_actors) in source_actor_downstreams { + for downstream_source_backfill_actor_id in downstream_actors.into_inner() { + source_backfill_actor_upstreams + .entry(downstream_source_backfill_actor_id) + .or_default() + .push(source_actor_id); + } + } + + Ok(source_backfill_actors .into_iter() - .map(|(actor_id, upstream_actor_ids)| { - let upstream_source_actors = - &upstream_actor_ids.0[&(upstream_source_fragment_id as i32)]; + .map(|actor_id| { + let upstream_source_actors = &source_backfill_actor_upstreams[&actor_id]; assert_eq!( upstream_source_actors.len(), 1, "expect only one upstream source actor, but got {:?}, actor_id: {}, fragment_id: {}", upstream_source_actors, actor_id, - fragment_id + source_backfill_fragment_id, ); (actor_id, upstream_source_actors[0]) }) @@ -1704,8 +1662,8 @@ mod tests { use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, - ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, + actor, actor_dispatcher, fragment, ActorId, ConnectorSplits, ExprContext, FragmentId, + I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, }; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; @@ -1845,7 +1803,6 @@ mod tests { let (fragment, actors, actor_dispatchers) = CatalogController::extract_fragment_and_actors( TEST_JOB_ID, &pb_fragment, - Some(&upstream_actor_ids), &pb_actor_status, &pb_actor_splits, )?; @@ -1891,29 +1848,14 @@ mod tests { }], })); - let actor_upstream_actor_ids = - upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap(); - + #[expect(deprecated)] actor::Model { actor_id: actor_id as ActorId, fragment_id: TEST_FRAGMENT_ID, status: ActorStatus::Running, splits: actor_splits, worker_id: 0, - upstream_actor_ids: ActorUpstreamActors( - actor_upstream_actor_ids - .into_iter() - .map(|(fragment_id, actors_ids)| { - ( - fragment_id as _, - actors_ids - .into_iter() - .map(|actor_id| actor_id as _) - .collect(), - ) - }) - .collect(), - ), + upstream_actor_ids: Default::default(), vnode_bitmap: actor_bitmaps .remove(&actor_id) .map(|bitmap| bitmap.to_protobuf()) @@ -1943,14 +1885,11 @@ mod tests { let stream_node = { let template_actor = actors.first().cloned().unwrap(); - let template_upstream_actor_ids = template_actor - .upstream_actor_ids - .into_inner() - .into_keys() - .map(|k| (k as _, Default::default())) - .collect(); + let template_upstream_actor_ids = upstream_actor_ids + .get(&(template_actor.actor_id as _)) + .unwrap(); - generate_merger_stream_node(&template_upstream_actor_ids) + generate_merger_stream_node(template_upstream_actor_ids) }; let fragment = fragment::Model { @@ -1964,13 +1903,12 @@ mod tests { vnode_count: VirtualNode::COUNT_FOR_TEST as _, }; - let (pb_fragment, actor_upstreams, pb_actor_status, pb_actor_splits) = - CatalogController::compose_fragment( - fragment.clone(), - actors.clone(), - actor_dispatchers.clone(), - ) - .unwrap(); + let (pb_fragment, pb_actor_status, pb_actor_splits) = CatalogController::compose_fragment( + fragment.clone(), + actors.clone(), + actor_dispatchers.clone(), + ) + .unwrap(); assert_eq!(pb_actor_status.len(), actor_count as usize); assert_eq!(pb_actor_splits.len(), actor_count as usize); @@ -1980,7 +1918,7 @@ mod tests { check_fragment(fragment, pb_fragment); check_actors( actors, - &actor_upstreams, + &upstream_actor_ids, actor_dispatchers, pb_actors, pb_actor_splits, @@ -2005,9 +1943,9 @@ mod tests { status, splits, worker_id: _, - upstream_actor_ids, vnode_bitmap, expr_context, + .. }, PbStreamActor { actor_id: pb_actor_id, @@ -2022,7 +1960,6 @@ mod tests { { assert_eq!(actor_id, pb_actor_id as ActorId); assert_eq!(fragment_id, pb_fragment_id as FragmentId); - let upstream_actor_ids = upstream_actor_ids.into_inner(); let actor_dispatcher: Vec = actor_dispatchers .remove(&actor_id) @@ -2041,19 +1978,10 @@ mod tests { visit_stream_node(stream_node, |body| { if let PbNodeBody::Merge(m) = body { - let expected_upstream_actor_ids = upstream_actor_ids - .get(&(m.upstream_fragment_id as _)) - .map(|actors| actors.iter().map(|id| *id as u32).collect_vec()) - .unwrap(); - let upstream_actor_ids = actor_upstreams + assert!(actor_upstreams .get(&(actor_id as _)) .unwrap() - .get(&m.upstream_fragment_id) - .unwrap() - .iter() - .cloned() - .collect_vec(); - assert_eq!(expected_upstream_actor_ids, upstream_actor_ids); + .contains_key(&m.upstream_fragment_id)); } }); diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 80bf41fe06fd2..612a0507ee705 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -27,7 +27,7 @@ use risingwave_meta_model::prelude::{ }; use risingwave_meta_model::{ actor, actor_dispatcher, fragment, sink, source, streaming_job, table, ActorId, ActorMapping, - ActorUpstreamActors, ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, + ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, }; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, @@ -496,7 +496,6 @@ impl CatalogController { pub fragment_id: FragmentId, pub status: ActorStatus, pub splits: Option, - pub upstream_actor_ids: ActorUpstreamActors, pub vnode_bitmap: Option, } @@ -531,7 +530,6 @@ impl CatalogController { .await?; let mut discovered_upstream_fragments = HashMap::new(); - let mut discovered_upstream_actors = HashMap::new(); for (fragment_id, actor_ids) in &fragment_actors { crit_check_in_loop!( @@ -706,21 +704,6 @@ impl CatalogController { "ActorDispatcher {id} has downstream_actor_id {dispatcher_downstream_actor_id} which does not exist", ) ); - - let actor_fragment_id = actor.fragment_id; - - crit_check_in_loop!( - flag, - actor_map[dispatcher_downstream_actor_id].upstream_actor_ids.inner_ref().contains_key(&actor.fragment_id), - format!( - "ActorDispatcher {id} has downstream_actor_id {dispatcher_downstream_actor_id} which does not have fragment_id {actor_fragment_id} in upstream_actor_id", - ) - ); - - discovered_upstream_actors - .entry(*dispatcher_downstream_actor_id) - .or_insert(HashSet::new()) - .insert(actor.actor_id); } match dispatcher_type { @@ -886,10 +869,7 @@ impl CatalogController { } for PartialActor { - actor_id, - status, - upstream_actor_ids, - .. + actor_id, status, .. } in actor_map.values() { crit_check_in_loop!( @@ -897,25 +877,6 @@ impl CatalogController { *status == ActorStatus::Running, format!("Actor {actor_id} has status {status:?} which is not Running",) ); - - let discovered_upstream_actor_ids = discovered_upstream_actors - .get(actor_id) - .cloned() - .unwrap_or_default(); - - let upstream_actor_ids: HashSet<_> = upstream_actor_ids - .inner_ref() - .iter() - .flat_map(|(_, v)| v.iter().copied()) - .collect(); - - crit_check_in_loop!( - flag, - discovered_upstream_actor_ids == upstream_actor_ids, - format!( - "Actor {actor_id} has different upstream_actor_ids from discovered: {discovered_upstream_actor_ids:?} != actor upstream actor ids: {upstream_actor_ids:?}", - ) - ) } if flag { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index ba638c31635a5..7fdabccc778d2 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; use anyhow::anyhow; @@ -33,9 +33,9 @@ use risingwave_meta_model::prelude::{ use risingwave_meta_model::table::TableType; use risingwave_meta_model::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, - streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, - ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SinkId, SourceId, - StreamNode, StreamingParallelism, UserId, + streaming_job, table, ActorId, ColumnCatalogArray, CreateType, DatabaseId, ExprNodeArray, + FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, + StreamingParallelism, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; @@ -1116,43 +1116,7 @@ impl CatalogController { .await?; // 2. update merges. - // 2.1 update downstream actor's upstream_actor_ids - for merge_update in merge_updates.values().flatten() { - assert!(merge_update.removed_upstream_actor_id.is_empty()); - assert!(merge_update.new_upstream_fragment_id.is_some()); - let (actor_id, mut upstream_actors) = - Actor::find_by_id(merge_update.actor_id as ActorId) - .select_only() - .columns([actor::Column::ActorId, actor::Column::UpstreamActorIds]) - .into_tuple::<(ActorId, ActorUpstreamActors)>() - .one(txn) - .await? - .ok_or_else(|| { - MetaError::catalog_id_not_found("actor", merge_update.actor_id) - })?; - - assert!(upstream_actors - .0 - .remove(&(merge_update.upstream_fragment_id as FragmentId)) - .is_some()); - upstream_actors.0.insert( - merge_update.new_upstream_fragment_id.unwrap() as _, - merge_update - .added_upstream_actor_id - .iter() - .map(|id| *id as _) - .collect(), - ); - actor::ActiveModel { - actor_id: Set(actor_id), - upstream_actor_ids: Set(upstream_actors), - ..Default::default() - } - .update(txn) - .await?; - } - - // 2.2 update downstream fragment's Merge node, and upstream_fragment_id + // update downstream fragment's Merge node, and upstream_fragment_id for (fragment_id, merge_updates) in merge_updates { let (fragment_id, mut stream_node, mut upstream_fragment_id) = Fragment::find_by_id(fragment_id as FragmentId) @@ -1609,7 +1573,7 @@ impl CatalogController { newly_created_actors, upstream_fragment_dispatcher_ids, upstream_dispatcher_mapping, - downstream_fragment_ids, + .. }, ) in reschedules { @@ -1624,37 +1588,19 @@ impl CatalogController { // add new actors for ( - ( - PbStreamActor { - actor_id, - fragment_id, - dispatcher, - vnode_bitmap, - expr_context, - .. - }, - node_actor_upstreams, - ), + PbStreamActor { + actor_id, + fragment_id, + dispatcher, + vnode_bitmap, + expr_context, + .. + }, actor_status, ) in newly_created_actors { - let mut actor_upstreams = BTreeMap::>::new(); let mut new_actor_dispatchers = vec![]; - for (fragment_id, upstream_actor_ids) in node_actor_upstreams { - actor_upstreams - .entry(fragment_id as FragmentId) - .or_default() - .extend(upstream_actor_ids.iter().map(|id| *id as ActorId)); - } - - let actor_upstreams: BTreeMap> = actor_upstreams - .into_iter() - .map(|(k, v)| (k, v.into_iter().collect())) - .collect(); - - let actor_upstreams = ActorUpstreamActors(actor_upstreams); - let splits = actor_splits .get(&actor_id) .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec()); @@ -1665,7 +1611,7 @@ impl CatalogController { status: Set(ActorStatus::Running), splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())), worker_id: Set(actor_status.worker_id() as _), - upstream_actor_ids: Set(actor_upstreams), + upstream_actor_ids: Set(Default::default()), vnode_bitmap: Set(vnode_bitmap.as_ref().map(|bitmap| bitmap.into())), expr_context: Set(expr_context.as_ref().unwrap().into()), }) @@ -1809,35 +1755,6 @@ impl CatalogController { dispatcher.update(&txn).await?; } } - - // second step, downstream fragment - for downstream_fragment_id in downstream_fragment_ids { - let actors = Actor::find() - .filter(actor::Column::FragmentId.eq(downstream_fragment_id as FragmentId)) - .all(&txn) - .await?; - - for actor in actors { - if new_created_actors.contains(&actor.actor_id) { - continue; - } - - let mut actor = actor.into_active_model(); - - let mut new_upstream_actor_ids = - actor.upstream_actor_ids.as_ref().inner_ref().clone(); - - update_actors( - new_upstream_actor_ids.get_mut(&fragment_id).unwrap(), - &removed_actor_ids, - &added_actor_ids, - ); - - actor.upstream_actor_ids = Set(new_upstream_actor_ids.into()); - - actor.update(&txn).await?; - } - } } let JobReschedulePostUpdates { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 2c5b5da5d7795..796490c52a48b 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -26,7 +26,7 @@ use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; -use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamScanType, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; @@ -37,9 +37,7 @@ use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo}; use crate::controller::fragment::FragmentParallelismInfo; use crate::manager::{LocalNotification, NotificationVersion}; -use crate::model::{ - ActorId, ClusterId, FragmentId, StreamActorWithUpstreams, StreamJobFragments, SubscriptionId, -}; +use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId}; use crate::stream::{JobReschedulePostUpdates, SplitAssignment}; use crate::telemetry::MetaTelemetryJobDesc; use crate::{MetaError, MetaResult}; @@ -561,11 +559,15 @@ impl MetadataManager { pub async fn get_running_actors_for_source_backfill( &self, - id: FragmentId, + source_backfill_fragment_id: FragmentId, + source_fragment_id: FragmentId, ) -> MetaResult> { let actor_ids = self .catalog_controller - .get_running_actors_for_source_backfill(id as _) + .get_running_actors_for_source_backfill( + source_backfill_fragment_id as _, + source_fragment_id as _, + ) .await?; Ok(actor_ids .into_iter() @@ -588,15 +590,13 @@ impl MetadataManager { Ok(table_fragments) } - pub async fn all_active_actors( - &self, - ) -> MetaResult> { + pub async fn all_active_actors(&self) -> MetaResult> { let table_fragments = self.catalog_controller.table_fragments().await?; let mut actor_maps = HashMap::new(); for (_, tf) in table_fragments { for actor in tf.active_actors() { actor_maps - .try_insert(actor.0.actor_id, actor) + .try_insert(actor.actor_id, actor) .expect("non duplicate"); } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index ffbb9fb4f3617..0da7f618d83be 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -124,7 +124,6 @@ pub struct StreamJobFragments { /// The table fragments. pub fragments: BTreeMap, - pub actor_upstreams: BTreeMap, /// The status of actors pub actor_status: BTreeMap, @@ -207,11 +206,9 @@ pub type StreamJobActorsToCreate = impl StreamJobFragments { /// Create a new `TableFragments` with state of `Initial`, with other fields empty. pub fn for_test(table_id: TableId, fragments: BTreeMap) -> Self { - let actor_upstreams = BTreeMap::new(); Self::new( table_id, fragments, - actor_upstreams, &BTreeMap::new(), StreamContext::default(), TableParallelism::Adaptive, @@ -224,7 +221,6 @@ impl StreamJobFragments { pub fn new( stream_job_id: TableId, fragments: BTreeMap, - actor_upstreams: BTreeMap, actor_locations: &BTreeMap, ctx: StreamContext, table_parallelism: TableParallelism, @@ -247,7 +243,6 @@ impl StreamJobFragments { stream_job_id, state: State::Initial, fragments, - actor_upstreams, actor_status, actor_splits: HashMap::default(), ctx, @@ -464,7 +459,7 @@ impl StreamJobFragments { /// Find the table job's `Union` fragment. /// Panics if not found. - pub fn union_fragment_for_table(&mut self) -> (&mut Fragment, &mut FragmentActorUpstreams) { + pub fn union_fragment_for_table(&mut self) -> &mut Fragment { let mut union_fragment_id = None; for (fragment_id, fragment) in &self.fragments { { @@ -489,10 +484,8 @@ impl StreamJobFragments { .fragments .get_mut(&union_fragment_id) .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id)); - ( - union_fragment, - self.actor_upstreams.entry(union_fragment_id).or_default(), - ) + + union_fragment } /// Resolve dependent table @@ -545,56 +538,42 @@ impl StreamJobFragments { } /// Returns the status of actors group by worker id. - pub fn active_actors(&self) -> Vec { + pub fn active_actors(&self) -> Vec { let mut actors = vec![]; for fragment in self.fragments.values() { for actor in &fragment.actors { if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 { continue; } - actors.push(( - actor.clone(), - self.actor_upstreams - .get(&fragment.fragment_id) - .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)) - .cloned() - .unwrap_or_default(), - )); + actors.push(actor.clone()); } } actors } - pub fn actors_to_create(&self) -> StreamJobActorsToCreate { - let mut actor_map: HashMap<_, HashMap<_, (_, Vec<_>)>> = HashMap::new(); - self.fragments - .values() - .flat_map(|fragment| { - let actor_upstreams = self.actor_upstreams.get(&fragment.fragment_id); + pub fn actors_to_create( + &self, + ) -> impl Iterator< + Item = ( + FragmentId, + &StreamNode, + impl Iterator + '_, + ), + > + '_ { + self.fragments.values().map(move |fragment| { + ( + fragment.fragment_id, + fragment.nodes.as_ref().unwrap(), fragment.actors.iter().map(move |actor| { - ( - actor, - actor_upstreams - .and_then(|actor_upstreams| actor_upstreams.get(&actor.actor_id)), - fragment, - ) - }) - }) - .for_each(|(actor, actor_upstream, fragment)| { - let worker_id = self - .actor_status - .get(&actor.actor_id) - .expect("should exist") - .worker_id() as WorkerId; - actor_map - .entry(worker_id) - .or_default() - .entry(fragment.fragment_id) - .or_insert_with(|| (fragment.nodes.clone().unwrap(), vec![])) - .1 - .push((actor.clone(), actor_upstream.cloned().unwrap_or_default())); - }); - actor_map + let worker_id = self + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .worker_id() as WorkerId; + (actor, worker_id) + }), + ) + }) } pub fn mv_table_id(&self) -> Option { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7a2affe59bcc9..5d7db509f29fa 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -48,7 +48,7 @@ use risingwave_pb::ddl_service::{ TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; -use risingwave_pb::meta::table_fragments::PbFragment; +use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ @@ -69,9 +69,7 @@ use crate::manager::{ LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{ - FragmentActorUpstreams, FragmentId, StreamContext, StreamJobFragments, TableParallelism, -}; +use crate::model::{FragmentId, StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ create_source_worker, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -749,17 +747,31 @@ impl DdlController { for actor in &fragment.actors { visit_stream_node(node, |node| { if let NodeBody::Merge(merge_node) = node { - let fragment_actor_upstreams = stream_job_fragments - .actor_upstreams - .get(&fragment.fragment_id) - .expect("should exist"); - let actor_upstreams = fragment_actor_upstreams - .get(&actor.actor_id) - .expect("should exist"); - let upstreams = actor_upstreams - .get(&merge_node.upstream_fragment_id) - .expect("should exist"); - assert!(!upstreams.is_empty(), "All the mergers for the union should have been fully assigned beforehand."); + let upstream_fragment_id = merge_node.upstream_fragment_id; + if let Some(external_upstream_fragment_dispatchers) = + replace_table_ctx.dispatchers.get(&upstream_fragment_id) + { + let mut upstream_dispatchers_to_actor = + external_upstream_fragment_dispatchers + .values() + .flatten() + .filter(|dispatcher| { + dispatcher.downstream_actor_id.contains(&actor.actor_id) + }); + assert!(upstream_dispatchers_to_actor.next().is_some(), "All the mergers for the union should have been fully assigned beforehand."); + } else { + let mut upstream_dispatchers_to_actor = stream_job_fragments + .fragments + .get(&upstream_fragment_id) + .expect("should exist") + .actors + .iter() + .flat_map(|actor| actor.dispatcher.iter()) + .filter(|dispatcher| { + dispatcher.downstream_actor_id.contains(&actor.actor_id) + }); + assert!(upstream_dispatchers_to_actor.next().is_some(), "All the mergers for the union should have been fully assigned beforehand."); + } } }); } @@ -774,18 +786,9 @@ impl DdlController { sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceStreamJobContext, - (union_fragment, union_fragment_actor_upstreams): ( - &mut PbFragment, - &mut FragmentActorUpstreams, - ), + union_fragment: &mut Fragment, unique_identity: Option<&str>, ) { - let sink_actor_ids = sink_fragment - .actors - .iter() - .map(|a| a.actor_id) - .collect_vec(); - let downstream_actor_ids = union_fragment .actors .iter() @@ -866,22 +869,6 @@ impl DdlController { if let Some(NodeBody::Merge(merge_node)) = &mut merge_stream_node.node_body { - assert!(union_fragment.actors.iter().all(|actor| { - union_fragment_actor_upstreams - .get(&actor.actor_id) - .and_then(|actor_upstream| { - actor_upstream.get(&merge_node.upstream_fragment_id) - }) - .map(|upstream_actor_ids| { - upstream_actor_ids.is_empty() - }) - .unwrap_or(true) - }), - "replace table plan for sink has set upstream. upstreams: {:?} actors: {:?}", - union_fragment_actor_upstreams, - union_fragment.actors - ); - { merge_stream_node.identity = format!("MergeExecutor(from sink {})", sink_id); @@ -900,17 +887,6 @@ impl DdlController { } }; - for actor in &union_fragment.actors { - union_fragment_actor_upstreams - .entry(actor.actor_id) - .or_default() - .try_insert( - upstream_fragment_id, - HashSet::from_iter(sink_actor_ids.iter().cloned()), - ) - .expect("checked non-exist"); - } - merge_stream_node.fields = sink_fields.to_vec(); return false; @@ -1622,11 +1598,11 @@ impl DdlController { let ActorGraphBuildResult { graph, - actor_upstreams, building_locations, existing_locations, dispatchers, merge_updates, + .. } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?; assert!(merge_updates.is_empty()); @@ -1644,7 +1620,6 @@ impl DdlController { let stream_job_fragments = StreamJobFragments::new( id.into(), graph, - actor_upstreams, &building_locations.actor_locations, stream_ctx.clone(), table_parallelism, @@ -1837,11 +1812,11 @@ impl DdlController { let ActorGraphBuildResult { graph, - actor_upstreams, building_locations, existing_locations, dispatchers, merge_updates, + .. } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?; // general table & source does not have upstream job, so the dispatchers should be empty @@ -1858,7 +1833,6 @@ impl DdlController { let stream_job_fragments = StreamJobFragments::new( (tmp_job_id as u32).into(), graph, - actor_upstreams, &building_locations.actor_locations, stream_ctx, old_fragments.assigned_parallelism, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 85e82288f564e..a5791310bf728 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -38,7 +38,6 @@ use risingwave_pb::meta::table_fragments::fragment::{ }; use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State}; use risingwave_pb::meta::FragmentWorkerSlotMappings; -use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamActor, StreamNode, @@ -52,7 +51,7 @@ use tokio::time::{Instant, MissedTickBehavior}; use crate::barrier::{Command, Reschedule}; use crate::controller::scale::RescheduleWorkingSet; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager}; -use crate::model::{ActorId, ActorUpstreams, DispatcherId, FragmentId, TableParallelism}; +use crate::model::{ActorId, DispatcherId, FragmentId, TableParallelism}; use crate::serving::{ to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping, ServingVnodeMapping, }; @@ -138,8 +137,8 @@ impl CustomFragmentInfo { } use educe::Educe; -use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; - +use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; +use risingwave_pb::stream_plan::stream_node::NodeBody; use super::SourceChange; use crate::controller::id::IdCategory; use crate::controller::utils::filter_workers_by_resource_group; @@ -1215,7 +1214,6 @@ impl ScaleController { for actor_to_create in &actors_to_create { let new_actor_id = actor_to_create.0; let mut new_actor = fragment.actor_template.clone(); - let mut new_actor_upstream = ActorUpstreams::new(); // This should be assigned before the `modify_actor_upstream_and_downstream` call, // because we need to use the new actor id to find the upstream and @@ -1227,10 +1225,8 @@ impl ScaleController { &fragment_actors_to_remove, &fragment_actors_to_create, &fragment_actor_bitmap, - &no_shuffle_upstream_actor_map, &no_shuffle_downstream_actors_map, - (&mut new_actor, &mut new_actor_upstream), - &fragment.node, + &mut new_actor, )?; if let Some(bitmap) = fragment_actor_bitmap @@ -1240,7 +1236,7 @@ impl ScaleController { new_actor.vnode_bitmap = Some(bitmap.to_protobuf()); } - new_created_actors.insert(*new_actor_id, (new_actor, new_actor_upstream)); + new_created_actors.insert(*new_actor_id, new_actor); } } @@ -1453,11 +1449,11 @@ impl ScaleController { for (fragment_id, actors_to_create) in &fragment_actors_to_create { let mut created_actors = HashMap::new(); for (actor_id, worker_id) in actors_to_create { - let (actor, actor_upstreams) = new_created_actors.get(actor_id).cloned().unwrap(); + let actor = new_created_actors.get(actor_id).cloned().unwrap(); created_actors.insert( *actor_id, ( - (actor, actor_upstreams), + actor, ActorStatus { location: PbActorLocation::from_worker(*worker_id as _), state: ActorState::Inactive as i32, @@ -1583,75 +1579,9 @@ impl ScaleController { fragment_actors_to_remove: &HashMap>, fragment_actors_to_create: &HashMap>, fragment_actor_bitmap: &HashMap>, - no_shuffle_upstream_actor_map: &HashMap>, no_shuffle_downstream_actors_map: &HashMap>, - (new_actor, actor_upstreams): (&mut StreamActor, &mut ActorUpstreams), - stream_node: &StreamNode, + new_actor: &mut StreamActor, ) -> MetaResult<()> { - let fragment = &ctx.fragment_map[&new_actor.fragment_id]; - let mut applied_upstream_fragment_actor_ids = HashMap::new(); - - for upstream_fragment_id in &fragment.upstream_fragment_ids { - let upstream_dispatch_type = &ctx - .fragment_dispatcher_map - .get(upstream_fragment_id) - .and_then(|map| map.get(&fragment.fragment_id)) - .unwrap(); - - match upstream_dispatch_type { - DispatcherType::Unspecified => unreachable!(), - DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => { - let upstream_fragment = &ctx.fragment_map[upstream_fragment_id]; - let mut upstream_actor_ids: HashSet<_> = upstream_fragment - .actors - .iter() - .map(|actor| actor.actor_id as ActorId) - .collect(); - - if let Some(upstream_actors_to_remove) = - fragment_actors_to_remove.get(upstream_fragment_id) - { - upstream_actor_ids - .retain(|actor_id| !upstream_actors_to_remove.contains_key(actor_id)); - } - - if let Some(upstream_actors_to_create) = - fragment_actors_to_create.get(upstream_fragment_id) - { - upstream_actor_ids.extend(upstream_actors_to_create.keys().cloned()); - } - - applied_upstream_fragment_actor_ids - .insert(*upstream_fragment_id as FragmentId, upstream_actor_ids); - } - DispatcherType::NoShuffle => { - let no_shuffle_upstream_actor_id = *no_shuffle_upstream_actor_map - .get(&new_actor.actor_id) - .and_then(|map| map.get(upstream_fragment_id)) - .unwrap(); - - applied_upstream_fragment_actor_ids.insert( - *upstream_fragment_id as FragmentId, - HashSet::from_iter([no_shuffle_upstream_actor_id as ActorId]), - ); - } - } - } - - { - visit_stream_node(stream_node, |node_body| { - if let NodeBody::Merge(s) = node_body { - let upstream_actor_ids = applied_upstream_fragment_actor_ids - .get(&s.upstream_fragment_id) - .cloned() - .unwrap(); - actor_upstreams - .try_insert(s.upstream_fragment_id, upstream_actor_ids) - .expect("non-duplicate"); - } - }); - } - // Update downstream actor ids for dispatcher in &mut new_actor.dispatcher { let downstream_fragment_id = dispatcher diff --git a/src/meta/src/stream/source_manager/split_assignment.rs b/src/meta/src/stream/source_manager/split_assignment.rs index f69f2b3c08359..53fd208596abd 100644 --- a/src/meta/src/stream/source_manager/split_assignment.rs +++ b/src/meta/src/stream/source_manager/split_assignment.rs @@ -372,7 +372,7 @@ impl SourceManagerCore { }; let actors = match self .metadata_manager - .get_running_actors_for_source_backfill(*fragment_id) + .get_running_actors_for_source_backfill(*fragment_id, *upstream_fragment_id) .await { Ok(actors) => {