From b4745d4db051eb3ea7bfa1e93bfda55c03d4b0e0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 29 Jan 2025 01:06:11 +0800 Subject: [PATCH] update node --- src/meta/src/barrier/command.rs | 28 +++++++++++--- src/meta/src/barrier/info.rs | 60 ++++++++++++++++++++++++++---- src/meta/src/rpc/ddl_controller.rs | 38 ++++++++----------- 3 files changed, 90 insertions(+), 36 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 2d5bd1208db3d..417d5a42341b4 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -145,14 +145,30 @@ impl ReplaceStreamJobPlan { .collect(), }, ); - assert!(fragment_changes - .insert(fragment.fragment_id, fragment_change) - .is_none()); + fragment_changes + .try_insert(fragment.fragment_id, fragment_change) + .expect("non-duplicate"); } for fragment in self.old_fragments.fragments.values() { - assert!(fragment_changes - .insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) - .is_none()); + fragment_changes + .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) + .expect("non-duplicate"); + } + for (fragment_id, merge_updates) in &self.merge_updates { + let replace_map = merge_updates + .iter() + .filter_map(|m| { + m.new_upstream_fragment_id.map(|new_upstream_fragment_id| { + (m.upstream_fragment_id, new_upstream_fragment_id) + }) + }) + .collect(); + fragment_changes + .try_insert( + *fragment_id, + CommandFragmentChanges::ReplaceNodeUpstream(replace_map), + ) + .expect("non-duplicate"); } fragment_changes } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 0ab59ff48a98e..be6b33475ac4f 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -15,7 +15,9 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; use risingwave_meta_model::WorkerId; +use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo; use tracing::warn; @@ -39,6 +41,10 @@ impl BarrierInfo { #[derive(Debug, Clone)] pub(crate) enum CommandFragmentChanges { NewFragment(TableId, InflightFragmentInfo), + ReplaceNodeUpstream( + /// old `fragment_id` -> new `fragment_id` + HashMap, + ), Reschedule { new_actors: HashMap, to_remove: HashSet, @@ -92,6 +98,16 @@ impl InflightDatabaseInfo { .get(&fragment_id) .expect("should exist") } + + fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo { + let job_id = self.fragment_location[&fragment_id]; + self.jobs + .get_mut(&job_id) + .expect("should exist") + .fragment_infos + .get_mut(&fragment_id) + .expect("should exist") + } } impl InflightDatabaseInfo { @@ -182,20 +198,47 @@ impl InflightDatabaseInfo { .expect("non duplicate"); } CommandFragmentChanges::Reschedule { new_actors, .. } => { - let job_id = self.fragment_location[&fragment_id]; - let info = self - .jobs - .get_mut(&job_id) - .expect("should exist") - .fragment_infos - .get_mut(&fragment_id) - .expect("should exist"); + let info = self.fragment_mut(fragment_id); let actors = &mut info.actors; for (actor_id, node_id) in &new_actors { assert!(actors.insert(*actor_id as _, *node_id as _).is_none()); } } CommandFragmentChanges::RemoveFragment => {} + CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => { + let mut remaining_fragment_ids: HashSet<_> = + replace_map.keys().cloned().collect(); + let info = self.fragment_mut(fragment_id); + visit_stream_node_mut(&mut info.nodes, |node| { + if let NodeBody::Merge(m) = node + && let Some(new_upstream_fragment_id) = + replace_map.get(&m.upstream_fragment_id) + { + if !remaining_fragment_ids.remove(&m.upstream_fragment_id) { + if cfg!(debug_assertions) { + panic!( + "duplicate upstream fragment: {:?} {:?}", + m, replace_map + ); + } else { + warn!(?m, ?replace_map, "duplicate upstream fragment"); + } + } + m.upstream_fragment_id = *new_upstream_fragment_id; + } + }); + if cfg!(debug_assertions) { + assert!( + remaining_fragment_ids.is_empty(), + "non-existing fragment to replace: {:?} {:?} {:?}", + remaining_fragment_ids, + info.nodes, + replace_map + ); + } else { + warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace"); + } + } } } } @@ -278,6 +321,7 @@ impl InflightDatabaseInfo { self.jobs.remove(&job_id).expect("should exist"); } } + CommandFragmentChanges::ReplaceNodeUpstream(_) => {} } } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7904b7b31ad9b..c72aa6d068cc5 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -692,7 +692,7 @@ impl DdlController { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); let sink = sink.expect("sink not found"); Self::inject_replace_table_plan_for_sink( - Some(sink.id), + sink.id, &sink_fragment, target_table, &mut replace_table_ctx, @@ -730,7 +730,7 @@ impl DdlController { let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( - Some(sink_id), + sink_id, &sink_fragment, target_table, &mut replace_table_ctx, @@ -767,7 +767,7 @@ impl DdlController { } pub(crate) fn inject_replace_table_plan_for_sink( - sink_id: Option, + sink_id: u32, sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceStreamJobContext, @@ -862,30 +862,24 @@ impl DdlController { if let Some(NodeBody::Merge(merge_node)) = &mut merge_stream_node.node_body - && union_fragment.actors.iter().any(|actor| { + { + assert!(union_fragment.actors.iter().all(|actor| { union_fragment_actor_upstreams .get(&actor.actor_id) .and_then(|actor_upstream| { actor_upstream.get(&merge_node.upstream_fragment_id) }) - .map(|upstream_actor_ids| upstream_actor_ids.is_empty()) + .map(|upstream_actor_ids| { + upstream_actor_ids.is_empty() + }) .unwrap_or(true) - }) - { - if cfg!(debug_assertions) { - union_fragment.actors.iter().for_each(|actor| { - assert!(union_fragment_actor_upstreams - .get(&actor.actor_id) - .and_then(|actor_upstream| { - actor_upstream - .get(&merge_node.upstream_fragment_id) - }) - .map(|upstream_actor_ids| upstream_actor_ids - .is_empty()) - .unwrap_or(true), "inconsistent replace table plan for sink. upstreams: {:?} actor_id: {} upstream_fragment_id: {}", union_fragment_actor_upstreams, actor.actor_id, merge_node.upstream_fragment_id) - }); - } - if let Some(sink_id) = sink_id { + }), + "replace table plan for sink has set upstream. upstreams: {:?} actors: {:?}", + union_fragment_actor_upstreams, + union_fragment.actors + ); + + { merge_stream_node.identity = format!("MergeExecutor(from sink {})", sink_id); @@ -1399,7 +1393,7 @@ impl DdlController { let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( - Some(*sink_id), + *sink_id, &sink_fragment, table, &mut ctx,