From 16c1b46eaa04253b31c13513101f217d62419961 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 23 Jan 2025 23:04:32 +0800 Subject: [PATCH 1/3] refactor(risedev): parallel service startup Signed-off-by: xxchan --- src/risedevtool/src/bin/risedev-dev.rs | 536 +++++++++++++------------ src/risedevtool/src/service_config.rs | 36 ++ src/risedevtool/src/task.rs | 3 +- src/risedevtool/src/util.rs | 10 +- 4 files changed, 335 insertions(+), 250 deletions(-) diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index fbd8dde9a7c89..5e961051785a8 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -12,16 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(trait_alias)] + +use std::collections::HashMap; use std::env; use std::fmt::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; use anyhow::{anyhow, Context, Result}; use console::style; use fs_err::OpenOptions; -use indicatif::ProgressBar; +use indicatif::{MultiProgress, ProgressBar}; use risedev::util::{complete_spin, fail_spin}; use risedev::{ generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, @@ -36,32 +40,22 @@ use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; use yaml_rust::YamlEmitter; -#[derive(Default)] pub struct ProgressManager { - pa: Option, + pa: MultiProgress, } impl ProgressManager { pub fn new() -> Self { - Self::default() + let pa = MultiProgress::default(); + pa.set_move_cursor(true); + Self { pa } } /// Create a new progress bar from task pub fn new_progress(&mut self) -> ProgressBar { - if let Some(ref pa) = self.pa { - pa.finish(); - } - let pb = risedev::util::new_spinner(); + let pb = risedev::util::new_spinner().with_finish(indicatif::ProgressFinish::AndLeave); pb.enable_steady_tick(Duration::from_millis(100)); - self.pa = Some(pb.clone()); - pb - } - - /// Finish all progress bars. - pub fn finish_all(&self) { - if let Some(ref pa) = self.pa { - pa.finish(); - } + self.pa.add(pb) } } @@ -114,267 +108,311 @@ fn task_main( // Then, start services one by one - let mut stat = vec![]; + let mut tasks = TaskScheduler::new(); for service in services { - let start_time = Instant::now(); - - match service { - ServiceConfig::Minio(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = MinioService::new(c.clone())?; - service.execute(&mut ctx)?; - - let mut task = risedev::ConfigureMinioTask::new(c.clone())?; - task.execute(&mut ctx)?; - } - ServiceConfig::Sqlite(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - - struct SqliteService(SqliteConfig); - impl Task for SqliteService { - fn execute( - &mut self, - _ctx: &mut ExecuteContext, - ) -> anyhow::Result<()> { - Ok(()) + let service_ = service.clone(); + let progress_bar = manager.new_progress(); + progress_bar.set_prefix(service.id().to_owned()); + progress_bar.set_message("waiting for previous service to start...".to_owned()); + let status_dir = status_dir.clone(); + let closure = move || { + let mut log = Vec::new(); + let start_time = Instant::now(); + let mut ctx = ExecuteContext::new(&mut log, progress_bar, status_dir); + let service = service_; + let id = service.id().to_owned(); + match service { + ServiceConfig::Minio(c) => { + let mut service = MinioService::new(c.clone())?; + service.execute(&mut ctx)?; + + let mut task = risedev::ConfigureMinioTask::new(c.clone())?; + task.execute(&mut ctx)?; + } + ServiceConfig::Sqlite(c) => { + struct SqliteService(SqliteConfig); + impl Task for SqliteService { + fn execute( + &mut self, + _ctx: &mut ExecuteContext, + ) -> anyhow::Result<()> { + Ok(()) + } + + fn id(&self) -> String { + self.0.id.clone() + } } - fn id(&self) -> String { - self.0.id.clone() - } - } + let prefix_data = env::var("PREFIX_DATA")?; + let file_dir = PathBuf::from(&prefix_data).join(&c.id); + std::fs::create_dir_all(&file_dir)?; + let file_path = file_dir.join(&c.file); - let prefix_data = env::var("PREFIX_DATA")?; - let file_dir = PathBuf::from(&prefix_data).join(&c.id); - std::fs::create_dir_all(&file_dir)?; - let file_path = file_dir.join(&c.file); + ctx.service(&SqliteService(c.clone())); + ctx.complete_spin(); + ctx.pb + .set_message(format!("using local sqlite: {:?}", file_path)); + } + ServiceConfig::Prometheus(c) => { + let mut service = PrometheusService::new(c.clone())?; + service.execute(&mut ctx)?; + let mut task = + risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("api http://{}:{}/", c.address, c.port)); + } + ServiceConfig::ComputeNode(c) => { + let mut service = ComputeNodeService::new(c.clone())?; + service.execute(&mut ctx)?; - ctx.service(&SqliteService(c.clone())); - ctx.complete_spin(); - ctx.pb - .set_message(format!("using local sqlite: {:?}", file_path)); - } - ServiceConfig::Prometheus(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = PrometheusService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("api http://{}:{}/", c.address, c.port)); - } - ServiceConfig::ComputeNode(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = ComputeNodeService::new(c.clone())?; - service.execute(&mut ctx)?; - - let mut task = - risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("api grpc://{}:{}/", c.address, c.port)); - } - ServiceConfig::MetaNode(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = MetaNodeService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = - risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; - task.execute(&mut ctx)?; - ctx.pb.set_message(format!( - "api grpc://{}:{}/, dashboard http://{}:{}/", - c.address, c.port, c.address, c.dashboard_port - )); - } - ServiceConfig::Frontend(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = FrontendService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = - risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("api postgres://{}:{}/", c.address, c.port)); - - writeln!( - log_buffer, - "* Run {} to start Postgres interactive shell.", - style(format_args!( - "psql -h localhost -p {} -d dev -U root", - c.port - )) - .blue() - .bold() - )?; - } - ServiceConfig::Compactor(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = CompactorService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = - risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("compactor {}:{}", c.address, c.port)); - } - ServiceConfig::Grafana(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = GrafanaService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("dashboard http://{}:{}/", c.address, c.port)); - } - ServiceConfig::Tempo(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = TempoService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = - risedev::TcpReadyCheckTask::new(c.listen_address.clone(), c.port, false)?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("api http://{}:{}/", c.listen_address, c.port)); - } - ServiceConfig::AwsS3(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - DummyService::new(&c.id).execute(&mut ctx)?; - ctx.pb - .set_message(format!("using AWS s3 bucket {}", c.bucket)); - } - ServiceConfig::Opendal(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - DummyService::new(&c.id).execute(&mut ctx)?; - ctx.pb - .set_message(format!("using Opendal, namenode = {}", c.namenode)); - } - ServiceConfig::Kafka(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = KafkaService::new(c.clone()); - service.execute(&mut ctx)?; - let mut task = risedev::KafkaReadyCheckTask::new(c.clone())?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("kafka {}:{}", c.address, c.port)); - } - ServiceConfig::SchemaRegistry(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = SchemaRegistryService::new(c.clone()); - service.execute(&mut ctx)?; - if c.user_managed { let mut task = risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; - } else { + ctx.pb + .set_message(format!("api grpc://{}:{}/", c.address, c.port)); + } + ServiceConfig::MetaNode(c) => { + let mut service = MetaNodeService::new(c.clone())?; + service.execute(&mut ctx)?; let mut task = - risedev::LogReadyCheckTask::new("Server started, listening for requests")?; + risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; + ctx.pb.set_message(format!( + "api grpc://{}:{}/, dashboard http://{}:{}/", + c.address, c.port, c.address, c.dashboard_port + )); } - ctx.pb - .set_message(format!("schema registry http://{}:{}", c.address, c.port)); - } - - ServiceConfig::Pubsub(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = PubsubService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = risedev::PubsubReadyTaskCheck::new(c.clone())?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("pubsub {}:{}", c.address, c.port)); - } - ServiceConfig::RedPanda(_) => { - return Err(anyhow!("redpanda is only supported in RiseDev compose.")); - } - ServiceConfig::Redis(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - let mut service = RedisService::new(c.clone())?; - service.execute(&mut ctx)?; - let mut task = risedev::RedisReadyCheckTask::new(c.clone())?; - task.execute(&mut ctx)?; - ctx.pb - .set_message(format!("redis {}:{}", c.address, c.port)); - } - ServiceConfig::MySql(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - MySqlService::new(c.clone()).execute(&mut ctx)?; - if c.user_managed { + ServiceConfig::Frontend(c) => { + let mut service = FrontendService::new(c.clone())?; + service.execute(&mut ctx)?; let mut task = risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; - } else { - // When starting a MySQL container, the MySQL process is set as the main process. - // Since the first process in a container always gets PID 1, the MySQL log always shows - // "starting as process 1". - let mut task = risedev::LogReadyCheckTask::new("starting as process 1\n")?; - task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("api postgres://{}:{}/", c.address, c.port)); + + // writeln!( + // log_buffer, + // "* Run {} to start Postgres interactive shell.", + // style(format_args!( + // "psql -h localhost -p {} -d dev -U root", + // c.port + // )) + // .blue() + // .bold() + // )?; } - ctx.pb - .set_message(format!("mysql {}:{}", c.address, c.port)); - } - ServiceConfig::Postgres(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - PostgresService::new(c.clone()).execute(&mut ctx)?; - if c.user_managed { + ServiceConfig::Compactor(c) => { + let mut service = CompactorService::new(c.clone())?; + service.execute(&mut ctx)?; let mut task = risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; task.execute(&mut ctx)?; - } else { - let mut task = risedev::LogReadyCheckTask::new_all([ - "ready to accept connections", // also appears in init process - "listening on IPv4 address", // only appears when ready - ])?; + ctx.pb + .set_message(format!("compactor {}:{}", c.address, c.port)); + } + ServiceConfig::Grafana(c) => { + let mut service = GrafanaService::new(c.clone())?; + service.execute(&mut ctx)?; + let mut task = + risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, false)?; task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("dashboard http://{}:{}/", c.address, c.port)); } - ctx.pb - .set_message(format!("postgres {}:{}", c.address, c.port)); - } - ServiceConfig::SqlServer(c) => { - let mut ctx = - ExecuteContext::new(&mut logger, manager.new_progress(), status_dir.clone()); - // only `c.password` will be used in `SqlServerService` as the password for user `sa`. - SqlServerService::new(c.clone()).execute(&mut ctx)?; - if c.user_managed { + ServiceConfig::Tempo(c) => { + let mut service = TempoService::new(c.clone())?; + service.execute(&mut ctx)?; let mut task = - risedev::TcpReadyCheckTask::new(c.address.clone(), c.port, c.user_managed)?; + risedev::TcpReadyCheckTask::new(c.listen_address.clone(), c.port, false)?; task.execute(&mut ctx)?; - } else { - let mut task = risedev::LogReadyCheckTask::new( - "SQL Server is now ready for client connections.", - )?; + ctx.pb + .set_message(format!("api http://{}:{}/", c.listen_address, c.port)); + } + ServiceConfig::AwsS3(c) => { + DummyService::new(&c.id).execute(&mut ctx)?; + ctx.pb + .set_message(format!("using AWS s3 bucket {}", c.bucket)); + } + ServiceConfig::Opendal(c) => { + DummyService::new(&c.id).execute(&mut ctx)?; + ctx.pb + .set_message(format!("using Opendal, namenode = {}", c.namenode)); + } + ServiceConfig::Kafka(c) => { + let mut service = KafkaService::new(c.clone()); + service.execute(&mut ctx)?; + let mut task = risedev::KafkaReadyCheckTask::new(c.clone())?; task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("kafka {}:{}", c.address, c.port)); + } + ServiceConfig::SchemaRegistry(c) => { + let mut service = SchemaRegistryService::new(c.clone()); + service.execute(&mut ctx)?; + if c.user_managed { + let mut task = risedev::TcpReadyCheckTask::new( + c.address.clone(), + c.port, + c.user_managed, + )?; + task.execute(&mut ctx)?; + } else { + let mut task = risedev::LogReadyCheckTask::new( + "Server started, listening for requests", + )?; + task.execute(&mut ctx)?; + } + ctx.pb + .set_message(format!("schema registry http://{}:{}", c.address, c.port)); + } + + ServiceConfig::Pubsub(c) => { + let mut service = PubsubService::new(c.clone())?; + service.execute(&mut ctx)?; + let mut task = risedev::PubsubReadyTaskCheck::new(c.clone())?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("pubsub {}:{}", c.address, c.port)); + } + ServiceConfig::RedPanda(_) => { + return Err(anyhow!("redpanda is only supported in RiseDev compose.")); + } + ServiceConfig::Redis(c) => { + let mut service = RedisService::new(c.clone())?; + service.execute(&mut ctx)?; + let mut task = risedev::RedisReadyCheckTask::new(c.clone())?; + task.execute(&mut ctx)?; + ctx.pb + .set_message(format!("redis {}:{}", c.address, c.port)); + } + ServiceConfig::MySql(c) => { + MySqlService::new(c.clone()).execute(&mut ctx)?; + if c.user_managed { + let mut task = risedev::TcpReadyCheckTask::new( + c.address.clone(), + c.port, + c.user_managed, + )?; + task.execute(&mut ctx)?; + } else { + // When starting a MySQL container, the MySQL process is set as the main process. + // Since the first process in a container always gets PID 1, the MySQL log always shows + // "starting as process 1". + let mut task = risedev::LogReadyCheckTask::new("starting as process 1\n")?; + task.execute(&mut ctx)?; + } + ctx.pb + .set_message(format!("mysql {}:{}", c.address, c.port)); + } + ServiceConfig::Postgres(c) => { + PostgresService::new(c.clone()).execute(&mut ctx)?; + if c.user_managed { + let mut task = risedev::TcpReadyCheckTask::new( + c.address.clone(), + c.port, + c.user_managed, + )?; + task.execute(&mut ctx)?; + } else { + let mut task = risedev::LogReadyCheckTask::new_all([ + "ready to accept connections", // also appears in init process + "listening on IPv4 address", // only appears when ready + ])?; + task.execute(&mut ctx)?; + } + ctx.pb + .set_message(format!("postgres {}:{}", c.address, c.port)); + } + ServiceConfig::SqlServer(c) => { + // only `c.password` will be used in `SqlServerService` as the password for user `sa`. + SqlServerService::new(c.clone()).execute(&mut ctx)?; + if c.user_managed { + let mut task = risedev::TcpReadyCheckTask::new( + c.address.clone(), + c.port, + c.user_managed, + )?; + task.execute(&mut ctx)?; + } else { + let mut task = risedev::LogReadyCheckTask::new( + "SQL Server is now ready for client connections.", + )?; + task.execute(&mut ctx)?; + } + ctx.pb + .set_message(format!("sqlserver {}:{}", c.address, c.port)); } - ctx.pb - .set_message(format!("sqlserver {}:{}", c.address, c.port)); } - } - let service_id = service.id().to_owned(); - let duration = Instant::now() - start_time; - stat.push((service_id, duration)); + let duration = Instant::now() - start_time; + Ok(TaskResult { + id, + time: duration, + log: String::from_utf8(log)?, + }) + }; + tasks.add(service, closure); } + let stat = tasks.run(&mut logger)?; + Ok((stat, log_buffer)) } +struct TaskResult { + id: String, + time: Duration, + log: String, +} +trait TaskFn = FnOnce() -> anyhow::Result + Send + 'static; +struct TaskScheduler { + /// In each group, the tasks are executed in sequence. + task_groups: HashMap>>, +} + +impl TaskScheduler { + fn new() -> Self { + Self { + task_groups: HashMap::new(), + } + } + + fn add(&mut self, config: &ServiceConfig, task: impl TaskFn) { + self.task_groups + .entry(config.task_group()) + .or_default() + .push(Box::new(task)); + } + + fn run(self, logger: &mut impl std::io::Write) -> anyhow::Result> { + let mut handles: Vec>>> = vec![]; + let mut stats = vec![]; + + let task_groups = self.task_groups; + for (_, tasks) in task_groups { + handles.push(std::thread::spawn(move || { + let mut res = vec![]; + for task in tasks { + let res_ = task()?; + res.push(res_); + } + Ok(res) + })); + } + for handle in handles { + for TaskResult { id, time, log } in handle.join().unwrap()? { + stats.push((id, time)); + write!(logger, "{}", log)?; + } + } + Ok(stats) + } +} + fn main() -> Result<()> { // Intentionally disable backtrace to provide more compact error message for `risedev dev`. // Backtraces for RisingWave components are enabled in `Task::execute`. @@ -386,7 +424,9 @@ fn main() -> Result<()> { EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .with_env_var("RISEDEV_RUST_LOG") - .from_env_lossy(), + .from_env_lossy() + // This log may pollute the progress bar. + .add_directive("librdkafka=off".parse().unwrap()), ) .init(); @@ -442,7 +482,7 @@ fn main() -> Result<()> { fail_spin(&p); } } - manager.finish_all(); + p.finish(); use risedev::util::stylized_risedev_subcmd as r; diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index c1070d43261c6..b20c47ca52bf3 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -505,6 +505,42 @@ impl ServiceConfig { Self::SchemaRegistry(c) => c.user_managed, } } + + pub fn task_group(&self) -> String { + match self { + ServiceConfig::ComputeNode(_) => "risingwave".to_owned(), + ServiceConfig::MetaNode(_) => "risingwave".to_owned(), + ServiceConfig::Frontend(_) => "risingwave".to_owned(), + ServiceConfig::Compactor(_) => "risingwave".to_owned(), + ServiceConfig::Minio(_) => "risingwave".to_owned(), + ServiceConfig::Sqlite(_) => "risingwave".to_owned(), + ServiceConfig::Prometheus(_) => "observability".to_owned(), + ServiceConfig::Grafana(_) => "observability".to_owned(), + ServiceConfig::Tempo(_) => "observability".to_owned(), + ServiceConfig::Opendal(_) => "risingwave".to_owned(), + ServiceConfig::AwsS3(_) => "risingwave".to_owned(), + ServiceConfig::Kafka(_) => "kafka".to_owned(), + ServiceConfig::SchemaRegistry(_) => "kafka".to_owned(), + ServiceConfig::Pubsub(_) => "pubsub".to_owned(), + ServiceConfig::Redis(_) => "redis".to_owned(), + ServiceConfig::RedPanda(_) => "kafka".to_owned(), + ServiceConfig::MySql(my_sql_config) => { + if matches!(my_sql_config.application, Application::Metastore) { + "risingwave".to_owned() + } else { + "mysql".to_owned() + } + } + ServiceConfig::Postgres(postgres_config) => { + if matches!(postgres_config.application, Application::Metastore) { + "risingwave".to_owned() + } else { + "postgres".to_owned() + } + } + ServiceConfig::SqlServer(_) => "sqlserver".to_owned(), + } + } } mod string { diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index 65c34c649ed68..c4dcab8426aba 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -76,7 +76,7 @@ pub use self::task_pubsub_emu_ready_check::*; pub use self::task_redis_ready_check::*; pub use self::task_tcp_ready_check::*; pub use self::tempo_service::*; -use crate::util::{complete_spin, get_program_args, get_program_name}; +use crate::util::{begin_spin, complete_spin, get_program_args, get_program_name}; use crate::wait::{wait, wait_tcp_available}; pub trait Task: 'static + Send { @@ -134,6 +134,7 @@ where pub fn service(&mut self, task: &impl Task) { let id = task.id(); if !id.is_empty() { + begin_spin(&self.pb); self.pb.set_prefix(id.clone()); self.id = Some(id.clone()); self.status_file = Some(self.status_dir.path().join(format!("{}.status", id))); diff --git a/src/risedevtool/src/util.rs b/src/risedevtool/src/util.rs index d25ad3d92788f..79bab7977d209 100644 --- a/src/risedevtool/src/util.rs +++ b/src/risedevtool/src/util.rs @@ -47,12 +47,20 @@ pub fn new_spinner() -> ProgressBar { let pb = ProgressBar::new(0); pb.set_style( ProgressStyle::default_spinner() - .template("{spinner} {prefix}: {msg}") + .template("🟡 {prefix}: {msg}") .unwrap(), ); pb } +pub fn begin_spin(pb: &ProgressBar) { + pb.set_style( + ProgressStyle::default_spinner() + .template("{spinner} {prefix}: {msg}") + .unwrap(), + ); +} + pub fn complete_spin(pb: &ProgressBar) { pb.set_style( ProgressStyle::default_spinner() From 8615e243977de8fb1dd40ce68624af3c254b6a7b Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 23 Jan 2025 23:10:42 +0800 Subject: [PATCH 2/3] fix psql command Signed-off-by: xxchan --- src/risedevtool/src/bin/risedev-dev.rs | 36 ++++++++++++++------------ 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 5e961051785a8..b71dbaa5c94c2 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -26,7 +26,7 @@ use anyhow::{anyhow, Context, Result}; use console::style; use fs_err::OpenOptions; use indicatif::{MultiProgress, ProgressBar}; -use risedev::util::{complete_spin, fail_spin}; +use risedev::util::{begin_spin, complete_spin, fail_spin}; use risedev::{ generate_risedev_env, preflight_check, CompactorService, ComputeNodeService, ConfigExpander, ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService, @@ -111,6 +111,18 @@ fn task_main( let mut tasks = TaskScheduler::new(); for service in services { + if let ServiceConfig::Frontend(c) = service { + writeln!( + log_buffer, + "* Run {} to start Postgres interactive shell.", + style(format_args!( + "psql -h localhost -p {} -d dev -U root", + c.port + )) + .blue() + .bold() + )?; + } let service_ = service.clone(); let progress_bar = manager.new_progress(); progress_bar.set_prefix(service.id().to_owned()); @@ -193,17 +205,6 @@ fn task_main( task.execute(&mut ctx)?; ctx.pb .set_message(format!("api postgres://{}:{}/", c.address, c.port)); - - // writeln!( - // log_buffer, - // "* Run {} to start Postgres interactive shell.", - // style(format_args!( - // "psql -h localhost -p {} -d dev -U root", - // c.port - // )) - // .blue() - // .bold() - // )?; } ServiceConfig::Compactor(c) => { let mut service = CompactorService::new(c.clone())?; @@ -432,11 +433,11 @@ fn main() -> Result<()> { preflight_check()?; - let task_name = std::env::args() + let profile = std::env::args() .nth(1) .unwrap_or_else(|| "default".to_owned()); - let (config_path, env, risedev_config) = ConfigExpander::expand(".", &task_name)?; + let (config_path, env, risedev_config) = ConfigExpander::expand(".", &profile)?; if let Some(config_path) = &config_path { let target = Path::new(&env::var("PREFIX_CONFIG")?).join("risingwave.toml"); @@ -458,11 +459,12 @@ fn main() -> Result<()> { // Always create a progress before calling `task_main`. Otherwise the progress bar won't be // shown. let p = manager.new_progress(); + begin_spin(&p); p.set_prefix("dev cluster"); p.set_message(format!( "starting {} services for {}...", services.len(), - task_name + profile )); let task_result = task_main(&mut manager, &services, env); @@ -470,14 +472,14 @@ fn main() -> Result<()> { Ok(_) => { p.set_message(format!( "done bootstrapping with config {}", - style(task_name).bold() + style(profile).bold() )); complete_spin(&p); } Err(_) => { p.set_message(format!( "failed to bootstrap with config {}", - style(task_name).bold() + style(profile).bold() )); fail_spin(&p); } From 7e8668e9813216928b44c5274364cca1a92ab0f5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 24 Jan 2025 13:55:47 +0800 Subject: [PATCH 3/3] use enum Signed-off-by: xxchan --- src/risedevtool/src/bin/risedev-dev.rs | 4 +- src/risedevtool/src/service_config.rs | 56 ++++++++++++++++---------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index b71dbaa5c94c2..2ee4c63f4b7f5 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -32,7 +32,7 @@ use risedev::{ ConfigureTmuxTask, DummyService, EnsureStopService, ExecuteContext, FrontendService, GrafanaService, KafkaService, MetaNodeService, MinioService, MySqlService, PostgresService, PrometheusService, PubsubService, RedisService, SchemaRegistryService, ServiceConfig, - SqlServerService, SqliteConfig, Task, TempoService, RISEDEV_NAME, + SqlServerService, SqliteConfig, Task, TaskGroup, TempoService, RISEDEV_NAME, }; use tempfile::tempdir; use thiserror_ext::AsReport; @@ -372,7 +372,7 @@ struct TaskResult { trait TaskFn = FnOnce() -> anyhow::Result + Send + 'static; struct TaskScheduler { /// In each group, the tasks are executed in sequence. - task_groups: HashMap>>, + task_groups: HashMap>>, } impl TaskScheduler { diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index b20c47ca52bf3..f30e69701f438 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -432,6 +432,18 @@ pub enum ServiceConfig { SqlServer(SqlServerConfig), } +#[derive(PartialEq, Eq, Hash)] +pub enum TaskGroup { + RisingWave, + Observability, + Kafka, + Pubsub, + MySql, + Postgres, + SqlServer, + Redis, +} + impl ServiceConfig { pub fn id(&self) -> &str { match self { @@ -506,39 +518,39 @@ impl ServiceConfig { } } - pub fn task_group(&self) -> String { + pub fn task_group(&self) -> TaskGroup { + use TaskGroup::*; match self { - ServiceConfig::ComputeNode(_) => "risingwave".to_owned(), - ServiceConfig::MetaNode(_) => "risingwave".to_owned(), - ServiceConfig::Frontend(_) => "risingwave".to_owned(), - ServiceConfig::Compactor(_) => "risingwave".to_owned(), - ServiceConfig::Minio(_) => "risingwave".to_owned(), - ServiceConfig::Sqlite(_) => "risingwave".to_owned(), - ServiceConfig::Prometheus(_) => "observability".to_owned(), - ServiceConfig::Grafana(_) => "observability".to_owned(), - ServiceConfig::Tempo(_) => "observability".to_owned(), - ServiceConfig::Opendal(_) => "risingwave".to_owned(), - ServiceConfig::AwsS3(_) => "risingwave".to_owned(), - ServiceConfig::Kafka(_) => "kafka".to_owned(), - ServiceConfig::SchemaRegistry(_) => "kafka".to_owned(), - ServiceConfig::Pubsub(_) => "pubsub".to_owned(), - ServiceConfig::Redis(_) => "redis".to_owned(), - ServiceConfig::RedPanda(_) => "kafka".to_owned(), + ServiceConfig::ComputeNode(_) + | ServiceConfig::MetaNode(_) + | ServiceConfig::Frontend(_) + | ServiceConfig::Compactor(_) + | ServiceConfig::Minio(_) + | ServiceConfig::Sqlite(_) => RisingWave, + ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => { + Observability + } + ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave, + ServiceConfig::Kafka(_) + | ServiceConfig::SchemaRegistry(_) + | ServiceConfig::RedPanda(_) => Kafka, + ServiceConfig::Pubsub(_) => Pubsub, + ServiceConfig::Redis(_) => Redis, ServiceConfig::MySql(my_sql_config) => { if matches!(my_sql_config.application, Application::Metastore) { - "risingwave".to_owned() + RisingWave } else { - "mysql".to_owned() + MySql } } ServiceConfig::Postgres(postgres_config) => { if matches!(postgres_config.application, Application::Metastore) { - "risingwave".to_owned() + RisingWave } else { - "postgres".to_owned() + Postgres } } - ServiceConfig::SqlServer(_) => "sqlserver".to_owned(), + ServiceConfig::SqlServer(_) => SqlServer, } } }