Skip to content

Commit

Permalink
refactor(risedev): use sqlx driver to test whether database is ready (#…
Browse files Browse the repository at this point in the history
…20360)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Feb 4, 2025
1 parent 13e412c commit f9e420f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 32 deletions.
6 changes: 2 additions & 4 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1489,8 +1489,7 @@ template:
# * If the user is "root", the password will be used as the root password.
# * Otherwise, a regular user will be created with the given password. The root password will be empty.
# Note that this only applies to fresh instances, i.e., the data directory is empty.
# - In user-managed mode, these configs are not validated by risedev.
# They are passed as-is to risedev-env default user for MySQL operations.
# - These configs will be passed as-is to risedev-env default user for MySQL operations.
# - This is not used in RISEDEV_MYSQL_WITH_OPTIONS_COMMON.
user: root
password: ""
Expand Down Expand Up @@ -1522,8 +1521,7 @@ template:

# Note:
# - This will be used to initialize the PostgreSQL instance if it's fresh.
# - In user-managed mode, these configs are not validated by risedev.
# They are passed as-is to risedev-env default user for PostgreSQL operations.
# - These configs will be passed as-is to risedev-env default user for PostgreSQL operations.
user: postgres
password: ""
database: "postgres"
Expand Down
47 changes: 19 additions & 28 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use risedev::{
PrometheusService, PubsubService, RedisService, SchemaRegistryService, ServiceConfig,
SqlServerService, SqliteConfig, Task, TaskGroup, TempoService, RISEDEV_NAME,
};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::postgres::PgConnectOptions;
use tempfile::tempdir;
use thiserror_ext::AsReport;
use tracing::level_filters::LevelFilter;
Expand Down Expand Up @@ -292,39 +294,28 @@ fn task_main(
}
ServiceConfig::MySql(c) => {
MySqlService::new(c.clone()).execute(&mut ctx)?;
if c.user_managed {
let mut task = risedev::TcpReadyCheckTask::new(
c.address.clone(),
c.port,
c.user_managed,
)?;
task.execute(&mut ctx)?;
} else {
// When starting a MySQL container, the MySQL process is set as the main process.
// Since the first process in a container always gets PID 1, the MySQL log always shows
// "starting as process 1".
let mut task = risedev::LogReadyCheckTask::new("starting as process 1\n")?;
task.execute(&mut ctx)?;
}
let mut task = risedev::DbReadyCheckTask::new(
MySqlConnectOptions::new()
.host(&c.address)
.port(c.port)
.username(&c.user)
.password(&c.password),
);
task.execute(&mut ctx)?;
ctx.pb
.set_message(format!("mysql {}:{}", c.address, c.port));
}
ServiceConfig::Postgres(c) => {
PostgresService::new(c.clone()).execute(&mut ctx)?;
if c.user_managed {
let mut task = risedev::TcpReadyCheckTask::new(
c.address.clone(),
c.port,
c.user_managed,
)?;
task.execute(&mut ctx)?;
} else {
let mut task = risedev::LogReadyCheckTask::new_all([
"ready to accept connections", // also appears in init process
"listening on IPv4 address", // only appears when ready
])?;
task.execute(&mut ctx)?;
}
let mut task = risedev::DbReadyCheckTask::new(
PgConnectOptions::new()
.host(&c.address)
.port(c.port)
.database("template1")
.username(&c.user)
.password(&c.password),
);
task.execute(&mut ctx)?;
ctx.pb
.set_message(format!("postgres {}:{}", c.address, c.port));
}
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod redis_service;
mod schema_registry_service;
mod sql_server_service;
mod task_configure_minio;
mod task_db_ready_check;
mod task_kafka_ready_check;
mod task_log_ready_check;
mod task_pubsub_emu_ready_check;
Expand Down Expand Up @@ -70,6 +71,7 @@ pub use self::redis_service::*;
pub use self::schema_registry_service::SchemaRegistryService;
pub use self::sql_server_service::*;
pub use self::task_configure_minio::*;
pub use self::task_db_ready_check::*;
pub use self::task_kafka_ready_check::*;
pub use self::task_log_ready_check::*;
pub use self::task_pubsub_emu_ready_check::*;
Expand Down
75 changes: 75 additions & 0 deletions src/risedevtool/src/task/task_db_ready_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use anyhow::Context as _;
use sqlx::{ConnectOptions, Connection as _};

use super::{ExecuteContext, Task};
use crate::wait::wait;

/// Check if the database is ready to use.
pub struct DbReadyCheckTask<O> {
options: O,
}

impl<O> DbReadyCheckTask<O> {
pub fn new(options: O) -> Self {
Self { options }
}
}

impl<O> Task for DbReadyCheckTask<O>
where
O: ConnectOptions,
O::Connection: Sized,
{
fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
let Some(id) = ctx.id.clone() else {
panic!("Service should be set before executing DbReadyCheckTask");
};

ctx.pb.set_message("waiting for ready...");

wait(
|| {
let options = self.options.clone();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

rt.block_on(async move {
let mut conn = options
.connect()
.await
.context("failed to connect to database")?;
conn.ping().await.context("failed to ping database")?;
Ok(())
})
},
&mut ctx.log,
ctx.status_file.as_ref().unwrap(),
&id,
Some(Duration::from_secs(20)),
true,
)
.with_context(|| format!("failed to wait for service `{id}` to be ready"))?;

ctx.complete_spin();

Ok(())
}
}

0 comments on commit f9e420f

Please sign in to comment.