From 667e8988301b8e81b117da00c0b75bea90b08f9b Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 21 Nov 2023 01:47:55 +0800 Subject: [PATCH] Add version variable, update its value, return in certain scenarios --- src/meta/src/rpc/ddl_controller.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 6ffc97c0c1abb..406b720beb870 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -780,13 +780,17 @@ impl DdlController { tracing::debug!(id = job_id, "finished stream job"); if let Some(replace_table_job) = &replace_table_job { - self.finish_replace_table( - &replace_table_job.streaming_job, - replace_table_job.col_index_mapping.clone(), - sink_id, - ) - .await?; + let version = self + .finish_replace_table( + &replace_table_job.streaming_job, + replace_table_job.col_index_mapping.clone(), + sink_id, + ) + .await?; + + return Ok(version); } + Ok(version) } @@ -797,7 +801,7 @@ impl DdlController { target_replace_info: Option, ) -> MetaResult { let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; - let (version, streaming_job_ids) = match job_id { + let (mut version, streaming_job_ids) = match job_id { StreamingJobId::MaterializedView(table_id) => { self.catalog_manager .drop_relation( @@ -842,7 +846,8 @@ impl DdlController { col_index_mapping, }) = target_replace_info { - self.replace_table(streaming_job, fragment_graph, col_index_mapping) + version = self + .replace_table(streaming_job, fragment_graph, col_index_mapping) .await?; }