Skip to content

Commit

Permalink
refactor(meta): deprecate persisted actor upstreams and get upstreams…
Browse files Browse the repository at this point in the history
… from dispatcher info
  • Loading branch information
wenym1 committed Feb 6, 2025
1 parent cb388c1 commit 4b20ca1
Show file tree
Hide file tree
Showing 16 changed files with 394 additions and 535 deletions.
1 change: 1 addition & 0 deletions src/meta/model/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct Model {
pub status: ActorStatus,
pub splits: Option<ConnectorSplits>,
pub worker_id: WorkerId,
#[deprecated]
pub upstream_actor_ids: ActorUpstreamActors,
pub vnode_bitmap: Option<VnodeBitmap>,
pub expr_context: ExprContext,
Expand Down
37 changes: 36 additions & 1 deletion src/meta/src/barrier/checkpoint/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::model::StreamJobActorsToCreate;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;

Expand Down Expand Up @@ -73,7 +74,41 @@ impl CreatingStreamingJobControl {
let table_id = info.stream_job_fragments.stream_job_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.stream_job_fragments.actors_to_create();
let mut actor_upstreams = Command::collect_actor_upstreams(
info.stream_job_fragments
.actors_to_create()
.flat_map(|(fragment_id, _, actors)| {
actors.map(move |(actor, _)| {
(actor.actor_id, fragment_id, actor.dispatcher.as_slice())
})
})
.chain(
info.dispatchers
.iter()
.flat_map(|(fragment_id, dispatchers)| {
dispatchers.iter().map(|(actor_id, dispatchers)| {
(*actor_id, *fragment_id, dispatchers.as_slice())
})
}),
),
None,
);
let mut actors_to_create = StreamJobActorsToCreate::default();
for (fragment_id, node, actors) in info.stream_job_fragments.actors_to_create() {
for (actor, worker_id) in actors {
actors_to_create
.entry(worker_id)
.or_default()
.entry(fragment_id)
.or_insert_with(|| (node.clone(), vec![]))
.1
.push((
actor.clone(),
actor_upstreams.remove(&actor.actor_id).unwrap_or_default(),
))
}
}

let graph_info = InflightStreamingJobInfo {
job_id: table_id,
fragment_infos,
Expand Down
174 changes: 160 additions & 14 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::warn;
Expand All @@ -50,8 +50,7 @@ use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate,
StreamJobFragments,
ActorId, ActorUpstreams, DispatcherId, FragmentId, StreamJobActorsToCreate, StreamJobFragments,
};
use crate::stream::{
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
Expand Down Expand Up @@ -86,7 +85,7 @@ pub struct Reschedule {
/// `Source` and `SourceBackfill` are handled together here.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

pub newly_created_actors: Vec<(StreamActorWithUpstreams, PbActorStatus)>,
pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

/// Replacing an old job with a new one. All actors in the job will be rebuilt.
Expand Down Expand Up @@ -983,22 +982,82 @@ impl Command {
) -> Option<StreamJobActorsToCreate> {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
CreateStreamingJobType::SinkIntoTable(replace_table) => {
replace_table.new_fragments.actors_to_create()
}
let sink_into_table_replace_plan = match job_type {
CreateStreamingJobType::Normal => None,
CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
CreateStreamingJobType::SnapshotBackfill(_) => {
// for snapshot backfill, the actors to create is measured separately
return None;
}
};
for (worker_id, new_actors) in info.stream_job_fragments.actors_to_create() {
map.entry(worker_id).or_default().extend(new_actors)
let get_actors_to_create = || {
sink_into_table_replace_plan
.map(|plan| plan.new_fragments.actors_to_create())
.into_iter()
.flatten()
.chain(info.stream_job_fragments.actors_to_create())
};
let mut actor_upstreams = Self::collect_actor_upstreams(
get_actors_to_create()
.flat_map(|(fragment_id, _, actors)| {
actors.map(move |(actor, _)| {
(actor.actor_id, fragment_id, actor.dispatcher.as_slice())
})
})
.chain(
sink_into_table_replace_plan
.map(|plan| {
plan.dispatchers.iter().flat_map(
|(fragment_id, dispatchers)| {
dispatchers.iter().map(|(actor_id, dispatchers)| {
(*actor_id, *fragment_id, dispatchers.as_slice())
})
},
)
})
.into_iter()
.flatten(),
)
.chain(
info.dispatchers
.iter()
.flat_map(|(fragment_id, dispatchers)| {
dispatchers.iter().map(|(actor_id, dispatchers)| {
(*actor_id, *fragment_id, dispatchers.as_slice())
})
}),
),
None,
);
let mut map = StreamJobActorsToCreate::default();
for (fragment_id, node, actors) in get_actors_to_create() {
for (actor, worker_id) in actors {
map.entry(worker_id)
.or_default()
.entry(fragment_id)
.or_insert_with(|| (node.clone(), vec![]))
.1
.push((
actor.clone(),
actor_upstreams.remove(&actor.actor_id).unwrap_or_default(),
))
}
}
Some(map)
}
Command::RescheduleFragment { reschedules, .. } => {
Command::RescheduleFragment {
reschedules,
fragment_actors,
..
} => {
let mut actor_upstreams = Self::collect_actor_upstreams(
reschedules.iter().flat_map(|(fragment_id, reschedule)| {
reschedule.newly_created_actors.iter().map(|(actor, _)| {
(actor.actor_id, *fragment_id, actor.dispatcher.as_slice())
})
}),
Some((reschedules, fragment_actors)),
);
let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
for (fragment_id, actor, status) in
reschedules.iter().flat_map(|(fragment_id, reschedule)| {
Expand All @@ -1009,6 +1068,7 @@ impl Command {
})
{
let worker_id = status.location.as_ref().unwrap().worker_node_id as _;
let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
map.entry(worker_id)
.or_default()
.entry(fragment_id)
Expand All @@ -1017,12 +1077,44 @@ impl Command {
(node, vec![])
})
.1
.push(actor.clone());
.push((actor.clone(), upstreams));
}
Some(map)
}
Command::ReplaceStreamJob(replace_table) => {
Some(replace_table.new_fragments.actors_to_create())
let mut actor_upstreams = Self::collect_actor_upstreams(
replace_table
.new_fragments
.actors_to_create()
.flat_map(|(fragment_id, _, actors)| {
actors.map(move |(actor, _)| {
(actor.actor_id, fragment_id, actor.dispatcher.as_slice())
})
})
.chain(replace_table.dispatchers.iter().flat_map(
|(fragment_id, dispatchers)| {
dispatchers.iter().map(|(actor_id, dispatchers)| {
(*actor_id, *fragment_id, dispatchers.as_slice())
})
},
)),
None,
);
let mut map = StreamJobActorsToCreate::default();
for (fragment_id, node, actors) in replace_table.new_fragments.actors_to_create() {
for (actor, worker_id) in actors {
map.entry(worker_id)
.or_default()
.entry(fragment_id)
.or_insert_with(|| (node.clone(), vec![]))
.1
.push((
actor.clone(),
actor_upstreams.remove(&actor.actor_id).unwrap_or_default(),
))
}
}
Some(map)
}
_ => None,
}
Expand Down Expand Up @@ -1104,3 +1196,57 @@ impl Command {
.flatten()
}
}

impl Command {
#[expect(clippy::type_complexity)]
pub fn collect_actor_upstreams(
actor_dispatchers: impl Iterator<Item = (ActorId, FragmentId, &[Dispatcher])>,
reschedule_dispatcher_update: Option<(
&HashMap<FragmentId, Reschedule>,
&HashMap<FragmentId, HashSet<ActorId>>,
)>,
) -> HashMap<ActorId, ActorUpstreams> {
let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
for (upstream_actor_id, upstream_fragment_id, dispatchers) in actor_dispatchers {
for downstream_actor_id in dispatchers
.iter()
.flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
{
actor_upstreams
.entry(*downstream_actor_id)
.or_default()
.entry(upstream_fragment_id)
.or_default()
.insert(upstream_actor_id);
}
}
if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
for reschedule in reschedules.values() {
for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
let upstream_reschedule = reschedules.get(upstream_fragment_id);
for upstream_actor_id in fragment_actors
.get(upstream_fragment_id)
.expect("should exist")
{
if let Some(upstream_reschedule) = upstream_reschedule
&& upstream_reschedule
.removed_actors
.contains(upstream_actor_id)
{
continue;
}
for downstream_actor_id in reschedule.added_actors.values().flatten() {
actor_upstreams
.entry(*downstream_actor_id)
.or_default()
.entry(*upstream_fragment_id)
.or_default()
.insert(*upstream_actor_id);
}
}
}
}
}
actor_upstreams
}
}
5 changes: 3 additions & 2 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::WorkerSlotId;
use risingwave_meta_model::StreamingParallelism;
use risingwave_pb::stream_plan::StreamActor;
use thiserror_ext::AsReport;
use tokio::time::Instant;
use tracing::{debug, info, warn};
Expand All @@ -33,7 +34,7 @@ use crate::barrier::info::InflightDatabaseInfo;
use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments, TableParallelism};
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
use crate::stream::{
JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
RescheduleOptions, SourceChange,
Expand Down Expand Up @@ -770,7 +771,7 @@ impl GlobalBarrierWorkerContextImpl {
}

/// Update all actors in compute nodes.
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActorWithUpstreams>> {
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
self.metadata_manager.all_active_actors().await
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use risingwave_connector::source::SplitImpl;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PbRecoveryStatus;
use risingwave_pb::stream_plan::StreamActor;
use tokio::sync::oneshot::Sender;

use self::notifier::Notifier;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamActorWithUpstreams, StreamJobFragments};
use crate::model::{ActorId, StreamJobFragments};
use crate::{MetaError, MetaResult};

mod checkpoint;
Expand Down Expand Up @@ -103,7 +104,7 @@ struct BarrierWorkerRuntimeInfoSnapshot {
database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
stream_actors: HashMap<ActorId, StreamActor>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
hummock_version_stats: HummockVersionStats,
Expand All @@ -114,7 +115,7 @@ impl BarrierWorkerRuntimeInfoSnapshot {
database_id: DatabaseId,
database_info: &InflightDatabaseInfo,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
stream_actors: &HashMap<ActorId, StreamActorWithUpstreams>,
stream_actors: &HashMap<ActorId, StreamActor>,
state_table_committed_epochs: &HashMap<TableId, u64>,
) -> MetaResult<()> {
{
Expand Down Expand Up @@ -189,7 +190,7 @@ struct DatabaseRuntimeInfoSnapshot {
database_fragment_info: InflightDatabaseInfo,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_info: InflightSubscriptionInfo,
stream_actors: HashMap<ActorId, StreamActorWithUpstreams>,
stream_actors: HashMap<ActorId, StreamActor>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
}
Expand Down
Loading

0 comments on commit 4b20ca1

Please sign in to comment.