Skip to content

Commit

Permalink
update node
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 28, 2025
1 parent 839cd38 commit 8228cc6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 14 deletions.
28 changes: 22 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,30 @@ impl ReplaceStreamJobPlan {
.collect(),
},
);
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, fragment_change)
.expect("non-duplicate");
}
for fragment in self.old_fragments.fragments.values() {
assert!(fragment_changes
.insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.expect("non-duplicate");
}
for (fragment_id, merge_updates) in &self.merge_updates {
let replace_map = merge_updates
.iter()
.filter_map(|m| {
m.new_upstream_fragment_id.map(|new_upstream_fragment_id| {
(m.upstream_fragment_id, new_upstream_fragment_id)
})
})
.collect();
fragment_changes
.try_insert(
*fragment_id,
CommandFragmentChanges::ReplaceNodeUpstream(replace_map),
)
.expect("non-duplicate");
}
fragment_changes
}
Expand Down
60 changes: 52 additions & 8 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
use risingwave_meta_model::WorkerId;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
use tracing::warn;

Expand All @@ -39,6 +41,10 @@ impl BarrierInfo {
#[derive(Debug, Clone)]
pub(crate) enum CommandFragmentChanges {
NewFragment(TableId, InflightFragmentInfo),
ReplaceNodeUpstream(
/// old `fragment_id` -> new `fragment_id`
HashMap<FragmentId, FragmentId>,
),
Reschedule {
new_actors: HashMap<ActorId, WorkerId>,
to_remove: HashSet<ActorId>,
Expand Down Expand Up @@ -92,6 +98,16 @@ impl InflightDatabaseInfo {
.get(&fragment_id)
.expect("should exist")
}

fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
let job_id = self.fragment_location[&fragment_id];
self.jobs
.get_mut(&job_id)
.expect("should exist")
.fragment_infos
.get_mut(&fragment_id)
.expect("should exist")
}
}

impl InflightDatabaseInfo {
Expand Down Expand Up @@ -182,20 +198,47 @@ impl InflightDatabaseInfo {
.expect("non duplicate");
}
CommandFragmentChanges::Reschedule { new_actors, .. } => {
let job_id = self.fragment_location[&fragment_id];
let info = self
.jobs
.get_mut(&job_id)
.expect("should exist")
.fragment_infos
.get_mut(&fragment_id)
.expect("should exist");
let info = self.fragment_mut(fragment_id);
let actors = &mut info.actors;
for (actor_id, node_id) in &new_actors {
assert!(actors.insert(*actor_id as _, *node_id as _).is_none());
}
}
CommandFragmentChanges::RemoveFragment => {}
CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
let mut remaining_fragment_ids: HashSet<_> =
replace_map.keys().cloned().collect();
let info = self.fragment_mut(fragment_id);
visit_stream_node_mut(&mut info.nodes, |node| {
if let NodeBody::Merge(m) = node
&& let Some(new_upstream_fragment_id) =
replace_map.get(&m.upstream_fragment_id)
{
if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
if cfg!(debug_assertions) {
panic!(
"duplicate upstream fragment: {:?} {:?}",
m, replace_map
);
} else {
warn!(?m, ?replace_map, "duplicate upstream fragment");
}
}
m.upstream_fragment_id = *new_upstream_fragment_id;
}
});
if cfg!(debug_assertions) {
assert!(
remaining_fragment_ids.is_empty(),
"non-existing fragment to replace: {:?} {:?} {:?}",
remaining_fragment_ids,
info.nodes,
replace_map
);
} else {
warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
}
}
}
}
}
Expand Down Expand Up @@ -278,6 +321,7 @@ impl InflightDatabaseInfo {
self.jobs.remove(&job_id).expect("should exist");
}
}
CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
}
}
}
Expand Down

0 comments on commit 8228cc6

Please sign in to comment.