From f9e420f8743215eff528f81bdd8478bb8467abf5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 4 Feb 2025 15:02:02 +0800 Subject: [PATCH] refactor(risedev): use sqlx driver to test whether database is ready (#20360) Signed-off-by: Bugen Zhao --- risedev.yml | 6 +- src/risedevtool/src/bin/risedev-dev.rs | 47 +++++------- src/risedevtool/src/task.rs | 2 + .../src/task/task_db_ready_check.rs | 75 +++++++++++++++++++ 4 files changed, 98 insertions(+), 32 deletions(-) create mode 100644 src/risedevtool/src/task/task_db_ready_check.rs diff --git a/risedev.yml b/risedev.yml index 1896cc111021b..f8ae148b735f3 100644 --- a/risedev.yml +++ b/risedev.yml @@ -1489,8 +1489,7 @@ template: # * If the user is "root", the password will be used as the root password. # * Otherwise, a regular user will be created with the given password. The root password will be empty. # Note that this only applies to fresh instances, i.e., the data directory is empty. - # - In user-managed mode, these configs are not validated by risedev. - # They are passed as-is to risedev-env default user for MySQL operations. + # - These configs will be passed as-is to risedev-env default user for MySQL operations. # - This is not used in RISEDEV_MYSQL_WITH_OPTIONS_COMMON. user: root password: "" @@ -1522,8 +1521,7 @@ template: # Note: # - This will be used to initialize the PostgreSQL instance if it's fresh. - # - In user-managed mode, these configs are not validated by risedev. - # They are passed as-is to risedev-env default user for PostgreSQL operations. + # - These configs will be passed as-is to risedev-env default user for PostgreSQL operations. user: postgres password: "" database: "postgres" diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 2ee4c63f4b7f5..788cd2d309b75 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -34,6 +34,8 @@ use risedev::{ PrometheusService, PubsubService, RedisService, SchemaRegistryService, ServiceConfig, SqlServerService, SqliteConfig, Task, TaskGroup, TempoService, RISEDEV_NAME, }; +use sqlx::mysql::MySqlConnectOptions; +use sqlx::postgres::PgConnectOptions; use tempfile::tempdir; use thiserror_ext::AsReport; use tracing::level_filters::LevelFilter; @@ -292,39 +294,28 @@ fn task_main( } ServiceConfig::MySql(c) => { MySqlService::new(c.clone()).execute(&mut ctx)?; - if c.user_managed { - let mut task = risedev::TcpReadyCheckTask::new( - c.address.clone(), - c.port, - c.user_managed, - )?; - task.execute(&mut ctx)?; - } else { - // When starting a MySQL container, the MySQL process is set as the main process. - // Since the first process in a container always gets PID 1, the MySQL log always shows - // "starting as process 1". - let mut task = risedev::LogReadyCheckTask::new("starting as process 1\n")?; - task.execute(&mut ctx)?; - } + let mut task = risedev::DbReadyCheckTask::new( + MySqlConnectOptions::new() + .host(&c.address) + .port(c.port) + .username(&c.user) + .password(&c.password), + ); + task.execute(&mut ctx)?; ctx.pb .set_message(format!("mysql {}:{}", c.address, c.port)); } ServiceConfig::Postgres(c) => { PostgresService::new(c.clone()).execute(&mut ctx)?; - if c.user_managed { - let mut task = risedev::TcpReadyCheckTask::new( - c.address.clone(), - c.port, - c.user_managed, - )?; - task.execute(&mut ctx)?; - } else { - let mut task = risedev::LogReadyCheckTask::new_all([ - "ready to accept connections", // also appears in init process - "listening on IPv4 address", // only appears when ready - ])?; - task.execute(&mut ctx)?; - } + let mut task = risedev::DbReadyCheckTask::new( + PgConnectOptions::new() + .host(&c.address) + .port(c.port) + .database("template1") + .username(&c.user) + .password(&c.password), + ); + task.execute(&mut ctx)?; ctx.pb .set_message(format!("postgres {}:{}", c.address, c.port)); } diff --git a/src/risedevtool/src/task.rs b/src/risedevtool/src/task.rs index c4dcab8426aba..380b70e89f94b 100644 --- a/src/risedevtool/src/task.rs +++ b/src/risedevtool/src/task.rs @@ -31,6 +31,7 @@ mod redis_service; mod schema_registry_service; mod sql_server_service; mod task_configure_minio; +mod task_db_ready_check; mod task_kafka_ready_check; mod task_log_ready_check; mod task_pubsub_emu_ready_check; @@ -70,6 +71,7 @@ pub use self::redis_service::*; pub use self::schema_registry_service::SchemaRegistryService; pub use self::sql_server_service::*; pub use self::task_configure_minio::*; +pub use self::task_db_ready_check::*; pub use self::task_kafka_ready_check::*; pub use self::task_log_ready_check::*; pub use self::task_pubsub_emu_ready_check::*; diff --git a/src/risedevtool/src/task/task_db_ready_check.rs b/src/risedevtool/src/task/task_db_ready_check.rs new file mode 100644 index 0000000000000..80b402210357b --- /dev/null +++ b/src/risedevtool/src/task/task_db_ready_check.rs @@ -0,0 +1,75 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Context as _; +use sqlx::{ConnectOptions, Connection as _}; + +use super::{ExecuteContext, Task}; +use crate::wait::wait; + +/// Check if the database is ready to use. +pub struct DbReadyCheckTask { + options: O, +} + +impl DbReadyCheckTask { + pub fn new(options: O) -> Self { + Self { options } + } +} + +impl Task for DbReadyCheckTask +where + O: ConnectOptions, + O::Connection: Sized, +{ + fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { + let Some(id) = ctx.id.clone() else { + panic!("Service should be set before executing DbReadyCheckTask"); + }; + + ctx.pb.set_message("waiting for ready..."); + + wait( + || { + let options = self.options.clone(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + rt.block_on(async move { + let mut conn = options + .connect() + .await + .context("failed to connect to database")?; + conn.ping().await.context("failed to ping database")?; + Ok(()) + }) + }, + &mut ctx.log, + ctx.status_file.as_ref().unwrap(), + &id, + Some(Duration::from_secs(20)), + true, + ) + .with_context(|| format!("failed to wait for service `{id}` to be ready"))?; + + ctx.complete_spin(); + + Ok(()) + } +}