From 8228cc62237957098ef4e144a6688c0a5c8c76e6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 29 Jan 2025 01:06:11 +0800 Subject: [PATCH] update node --- src/meta/src/barrier/command.rs | 28 +++++++++++---- src/meta/src/barrier/info.rs | 60 ++++++++++++++++++++++++++++----- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 26bacd97eef0d..d34b227ab0d2e 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -145,14 +145,30 @@ impl ReplaceStreamJobPlan { .collect(), }, ); - assert!(fragment_changes - .insert(fragment.fragment_id, fragment_change) - .is_none()); + fragment_changes + .try_insert(fragment.fragment_id, fragment_change) + .expect("non-duplicate"); } for fragment in self.old_fragments.fragments.values() { - assert!(fragment_changes - .insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) - .is_none()); + fragment_changes + .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment) + .expect("non-duplicate"); + } + for (fragment_id, merge_updates) in &self.merge_updates { + let replace_map = merge_updates + .iter() + .filter_map(|m| { + m.new_upstream_fragment_id.map(|new_upstream_fragment_id| { + (m.upstream_fragment_id, new_upstream_fragment_id) + }) + }) + .collect(); + fragment_changes + .try_insert( + *fragment_id, + CommandFragmentChanges::ReplaceNodeUpstream(replace_map), + ) + .expect("non-duplicate"); } fragment_changes } diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 0ab59ff48a98e..be6b33475ac4f 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -15,7 +15,9 @@ use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut; use risingwave_meta_model::WorkerId; +use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo; use tracing::warn; @@ -39,6 +41,10 @@ impl BarrierInfo { #[derive(Debug, Clone)] pub(crate) enum CommandFragmentChanges { NewFragment(TableId, InflightFragmentInfo), + ReplaceNodeUpstream( + /// old `fragment_id` -> new `fragment_id` + HashMap, + ), Reschedule { new_actors: HashMap, to_remove: HashSet, @@ -92,6 +98,16 @@ impl InflightDatabaseInfo { .get(&fragment_id) .expect("should exist") } + + fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo { + let job_id = self.fragment_location[&fragment_id]; + self.jobs + .get_mut(&job_id) + .expect("should exist") + .fragment_infos + .get_mut(&fragment_id) + .expect("should exist") + } } impl InflightDatabaseInfo { @@ -182,20 +198,47 @@ impl InflightDatabaseInfo { .expect("non duplicate"); } CommandFragmentChanges::Reschedule { new_actors, .. } => { - let job_id = self.fragment_location[&fragment_id]; - let info = self - .jobs - .get_mut(&job_id) - .expect("should exist") - .fragment_infos - .get_mut(&fragment_id) - .expect("should exist"); + let info = self.fragment_mut(fragment_id); let actors = &mut info.actors; for (actor_id, node_id) in &new_actors { assert!(actors.insert(*actor_id as _, *node_id as _).is_none()); } } CommandFragmentChanges::RemoveFragment => {} + CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => { + let mut remaining_fragment_ids: HashSet<_> = + replace_map.keys().cloned().collect(); + let info = self.fragment_mut(fragment_id); + visit_stream_node_mut(&mut info.nodes, |node| { + if let NodeBody::Merge(m) = node + && let Some(new_upstream_fragment_id) = + replace_map.get(&m.upstream_fragment_id) + { + if !remaining_fragment_ids.remove(&m.upstream_fragment_id) { + if cfg!(debug_assertions) { + panic!( + "duplicate upstream fragment: {:?} {:?}", + m, replace_map + ); + } else { + warn!(?m, ?replace_map, "duplicate upstream fragment"); + } + } + m.upstream_fragment_id = *new_upstream_fragment_id; + } + }); + if cfg!(debug_assertions) { + assert!( + remaining_fragment_ids.is_empty(), + "non-existing fragment to replace: {:?} {:?} {:?}", + remaining_fragment_ids, + info.nodes, + replace_map + ); + } else { + warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace"); + } + } } } } @@ -278,6 +321,7 @@ impl InflightDatabaseInfo { self.jobs.remove(&job_id).expect("should exist"); } } + CommandFragmentChanges::ReplaceNodeUpstream(_) => {} } } }