Skip to content

Commit

Permalink
feat: add shutdown for standalone and metasrv (#1174)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Mar 17, 2023
1 parent a7676d8 commit 7cfa30b
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 26 deletions.
1 change: 0 additions & 1 deletion src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ impl Instance {
}

pub async fn stop(&self) -> Result<()> {
// TODO: handle cli shutdown
Ok(())
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;

use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::error::{Error, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu};
use crate::toml_loader;

pub struct Instance {
Expand All @@ -34,8 +34,10 @@ impl Instance {
}

pub async fn stop(&self) -> Result<()> {
// TODO: handle datanode shutdown
Ok(())
self.datanode
.shutdown()
.await
.context(ShutdownDatanodeSnafu)
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,24 @@ pub enum Error {
source: datanode::error::Error,
},

#[snafu(display("Failed to shutdown datanode, source: {}", source))]
ShutdownDatanode {
#[snafu(backtrace)]
source: datanode::error::Error,
},

#[snafu(display("Failed to start frontend, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},

#[snafu(display("Failed to shutdown frontend, source: {}", source))]
ShutdownFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},

#[snafu(display("Failed to build meta server, source: {}", source))]
BuildMetaServer {
#[snafu(backtrace)]
Expand All @@ -44,6 +56,12 @@ pub enum Error {
source: meta_srv::error::Error,
},

#[snafu(display("Failed to shutdown meta server, source: {}", source))]
ShutdownMetaServer {
#[snafu(backtrace)]
source: meta_srv::error::Error,
},

#[snafu(display("Failed to read config file: {}, source: {}", path, source))]
ReadConfig {
path: String,
Expand Down Expand Up @@ -143,7 +161,10 @@ impl ErrorExt for Error {
match self {
Error::StartDatanode { source } => source.status_code(),
Error::StartFrontend { source } => source.status_code(),
Error::ShutdownDatanode { source } => source.status_code(),
Error::ShutdownFrontend { source } => source.status_code(),
Error::StartMetaServer { source } => source.status_code(),
Error::ShutdownMetaServer { source } => source.status_code(),
Error::BuildMetaServer { source } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => {
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl Instance {
}

pub async fn stop(&self) -> Result<()> {
// TODO: handle frontend shutdown
Ok(())
self.frontend
.shutdown()
.await
.context(error::ShutdownFrontendSnafu)
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ impl Instance {
self.instance
.start()
.await
.context(error::StartMetaServerSnafu)?;
Ok(())
.context(error::StartMetaServerSnafu)
}

pub async fn stop(&self) -> Result<()> {
// TODO: handle metasrv shutdown
Ok(())
self.instance
.shutdown()
.await
.context(error::ShutdownMetaServerSnafu)
}
}

Expand Down
17 changes: 15 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;

use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu};
use crate::error::{
Error, IllegalConfigSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::toml_loader;

Expand Down Expand Up @@ -152,7 +155,17 @@ impl Instance {
}

pub async fn stop(&self) -> Result<()> {
// TODO: handle standalone shutdown
self.frontend
.shutdown()
.await
.context(ShutdownFrontendSnafu)?;

self.datanode
.shutdown_instance()
.await
.context(ShutdownDatanodeSnafu)?;
info!("Datanode instance stopped.");

Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Datanode {
self.instance.clone()
}

async fn shutdown_instance(&self) -> Result<()> {
pub async fn shutdown_instance(&self) -> Result<()> {
self.instance.shutdown().await
}

Expand Down
48 changes: 35 additions & 13 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use api::v1::meta::store_server::StoreServer;
use etcd_client::Client;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;

Expand All @@ -44,44 +45,65 @@ pub struct MetaSrvInstance {
meta_srv: MetaSrv,

opts: MetaSrvOptions,

signal_sender: Option<Sender<()>>,
}

impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;

Ok(MetaSrvInstance { meta_srv, opts })
Ok(MetaSrvInstance {
meta_srv,
opts,
signal_sender: None,
})
}

pub async fn start(&self) -> Result<()> {
pub async fn start(&mut self) -> Result<()> {
self.meta_srv.start().await;
bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone())).await?;
let (tx, mut rx) = mpsc::channel::<()>(1);

self.signal_sender = Some(tx);

bootstrap_meta_srv_with_router(
&self.opts.bind_addr,
router(self.meta_srv.clone()),
&mut rx,
)
.await?;

Ok(())
}

pub async fn close(&self) -> Result<()> {
// TODO: shutdown the router
pub async fn shutdown(&self) -> Result<()> {
if let Some(signal) = &self.signal_sender {
signal
.send(())
.await
.context(error::SendShutdownSignalSnafu)?;
}

self.meta_srv.shutdown();

Ok(())
}
}

// Bootstrap the rpc server to serve incoming request
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> {
let meta_srv = make_meta_srv(&opts).await?;
bootstrap_meta_srv_with_router(&opts.bind_addr, router(meta_srv)).await
}

pub async fn bootstrap_meta_srv_with_router(bind_addr: &str, router: Router) -> Result<()> {
pub async fn bootstrap_meta_srv_with_router(
bind_addr: &str,
router: Router,
signal: &mut Receiver<()>,
) -> Result<()> {
let listener = TcpListener::bind(bind_addr)
.await
.context(error::TcpBindSnafu { addr: bind_addr })?;
let listener = TcpListenerStream::new(listener);

router
.serve_with_incoming(listener)
.serve_with_incoming_shutdown(listener, async {
signal.recv().await;
})
.await
.context(error::StartGrpcSnafu)?;

Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
use std::string::FromUtf8Error;

use common_error::prelude::*;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::{Code, Status};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to send shutdown signal"))]
SendShutdownSignal { source: SendError<()> },

#[snafu(display("Error stream request next is None"))]
StreamNone { backtrace: Backtrace },

Expand Down Expand Up @@ -312,6 +316,7 @@ impl ErrorExt for Error {
| Error::LeaseGrant { .. }
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down

0 comments on commit 7cfa30b

Please sign in to comment.