Skip to content

Commit

Permalink
update node
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 6, 2025
1 parent 32a778d commit b4745d4
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
28 changes: 22 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,30 @@ impl ReplaceStreamJobPlan {
.collect(),
},
);
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, fragment_change)
.expect("non-duplicate");
}
for fragment in self.old_fragments.fragments.values() {
assert!(fragment_changes
.insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.is_none());
fragment_changes
.try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
.expect("non-duplicate");
}
for (fragment_id, merge_updates) in &self.merge_updates {
let replace_map = merge_updates
.iter()
.filter_map(|m| {
m.new_upstream_fragment_id.map(|new_upstream_fragment_id| {
(m.upstream_fragment_id, new_upstream_fragment_id)
})
})
.collect();
fragment_changes
.try_insert(
*fragment_id,
CommandFragmentChanges::ReplaceNodeUpstream(replace_map),
)
.expect("non-duplicate");
}
fragment_changes
}
Expand Down
60 changes: 52 additions & 8 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
use risingwave_meta_model::WorkerId;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
use tracing::warn;

Expand All @@ -39,6 +41,10 @@ impl BarrierInfo {
#[derive(Debug, Clone)]
pub(crate) enum CommandFragmentChanges {
NewFragment(TableId, InflightFragmentInfo),
ReplaceNodeUpstream(
/// old `fragment_id` -> new `fragment_id`
HashMap<FragmentId, FragmentId>,
),
Reschedule {
new_actors: HashMap<ActorId, WorkerId>,
to_remove: HashSet<ActorId>,
Expand Down Expand Up @@ -92,6 +98,16 @@ impl InflightDatabaseInfo {
.get(&fragment_id)
.expect("should exist")
}

fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
let job_id = self.fragment_location[&fragment_id];
self.jobs
.get_mut(&job_id)
.expect("should exist")
.fragment_infos
.get_mut(&fragment_id)
.expect("should exist")
}
}

impl InflightDatabaseInfo {
Expand Down Expand Up @@ -182,20 +198,47 @@ impl InflightDatabaseInfo {
.expect("non duplicate");
}
CommandFragmentChanges::Reschedule { new_actors, .. } => {
let job_id = self.fragment_location[&fragment_id];
let info = self
.jobs
.get_mut(&job_id)
.expect("should exist")
.fragment_infos
.get_mut(&fragment_id)
.expect("should exist");
let info = self.fragment_mut(fragment_id);
let actors = &mut info.actors;
for (actor_id, node_id) in &new_actors {
assert!(actors.insert(*actor_id as _, *node_id as _).is_none());
}
}
CommandFragmentChanges::RemoveFragment => {}
CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
let mut remaining_fragment_ids: HashSet<_> =
replace_map.keys().cloned().collect();
let info = self.fragment_mut(fragment_id);
visit_stream_node_mut(&mut info.nodes, |node| {
if let NodeBody::Merge(m) = node
&& let Some(new_upstream_fragment_id) =
replace_map.get(&m.upstream_fragment_id)
{
if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
if cfg!(debug_assertions) {
panic!(
"duplicate upstream fragment: {:?} {:?}",
m, replace_map
);
} else {
warn!(?m, ?replace_map, "duplicate upstream fragment");
}
}
m.upstream_fragment_id = *new_upstream_fragment_id;
}
});
if cfg!(debug_assertions) {
assert!(
remaining_fragment_ids.is_empty(),
"non-existing fragment to replace: {:?} {:?} {:?}",
remaining_fragment_ids,
info.nodes,
replace_map
);
} else {
warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
}
}
}
}
}
Expand Down Expand Up @@ -278,6 +321,7 @@ impl InflightDatabaseInfo {
self.jobs.remove(&job_id).expect("should exist");
}
}
CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
}
}
}
Expand Down
38 changes: 16 additions & 22 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ impl DdlController {
let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
let sink = sink.expect("sink not found");
Self::inject_replace_table_plan_for_sink(
Some(sink.id),
sink.id,
&sink_fragment,
target_table,
&mut replace_table_ctx,
Expand Down Expand Up @@ -730,7 +730,7 @@ impl DdlController {
let sink_fragment = sink_table_fragments.sink_fragment().unwrap();

Self::inject_replace_table_plan_for_sink(
Some(sink_id),
sink_id,
&sink_fragment,
target_table,
&mut replace_table_ctx,
Expand Down Expand Up @@ -767,7 +767,7 @@ impl DdlController {
}

pub(crate) fn inject_replace_table_plan_for_sink(
sink_id: Option<u32>,
sink_id: u32,
sink_fragment: &PbFragment,
table: &Table,
replace_table_ctx: &mut ReplaceStreamJobContext,
Expand Down Expand Up @@ -862,30 +862,24 @@ impl DdlController {

if let Some(NodeBody::Merge(merge_node)) =
&mut merge_stream_node.node_body
&& union_fragment.actors.iter().any(|actor| {
{
assert!(union_fragment.actors.iter().all(|actor| {
union_fragment_actor_upstreams
.get(&actor.actor_id)
.and_then(|actor_upstream| {
actor_upstream.get(&merge_node.upstream_fragment_id)
})
.map(|upstream_actor_ids| upstream_actor_ids.is_empty())
.map(|upstream_actor_ids| {
upstream_actor_ids.is_empty()
})
.unwrap_or(true)
})
{
if cfg!(debug_assertions) {
union_fragment.actors.iter().for_each(|actor| {
assert!(union_fragment_actor_upstreams
.get(&actor.actor_id)
.and_then(|actor_upstream| {
actor_upstream
.get(&merge_node.upstream_fragment_id)
})
.map(|upstream_actor_ids| upstream_actor_ids
.is_empty())
.unwrap_or(true), "inconsistent replace table plan for sink. upstreams: {:?} actor_id: {} upstream_fragment_id: {}", union_fragment_actor_upstreams, actor.actor_id, merge_node.upstream_fragment_id)
});
}
if let Some(sink_id) = sink_id {
}),
"replace table plan for sink has set upstream. upstreams: {:?} actors: {:?}",
union_fragment_actor_upstreams,
union_fragment.actors
);

{
merge_stream_node.identity =
format!("MergeExecutor(from sink {})", sink_id);

Expand Down Expand Up @@ -1399,7 +1393,7 @@ impl DdlController {
let sink_fragment = sink_table_fragments.sink_fragment().unwrap();

Self::inject_replace_table_plan_for_sink(
Some(*sink_id),
*sink_id,
&sink_fragment,
table,
&mut ctx,
Expand Down

0 comments on commit b4745d4

Please sign in to comment.