Skip to content

Commit

Permalink
Add version variable, update its value, return in certain scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Nov 20, 2023
1 parent 4348f67 commit 667e898
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,13 +780,17 @@ impl DdlController {
tracing::debug!(id = job_id, "finished stream job");

if let Some(replace_table_job) = &replace_table_job {
self.finish_replace_table(
&replace_table_job.streaming_job,
replace_table_job.col_index_mapping.clone(),
sink_id,
)
.await?;
let version = self
.finish_replace_table(
&replace_table_job.streaming_job,
replace_table_job.col_index_mapping.clone(),
sink_id,
)
.await?;

return Ok(version);
}

Ok(version)
}

Expand All @@ -797,7 +801,7 @@ impl DdlController {
target_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let (version, streaming_job_ids) = match job_id {
let (mut version, streaming_job_ids) = match job_id {
StreamingJobId::MaterializedView(table_id) => {
self.catalog_manager
.drop_relation(
Expand Down Expand Up @@ -842,7 +846,8 @@ impl DdlController {
col_index_mapping,
}) = target_replace_info
{
self.replace_table(streaming_job, fragment_graph, col_index_mapping)
version = self
.replace_table(streaming_job, fragment_graph, col_index_mapping)
.await?;
}

Expand Down

0 comments on commit 667e898

Please sign in to comment.