Skip to content
This repository has been archived by the owner on Feb 5, 2025. It is now read-only.

Commit

Permalink
Merge pull request #460 from EspressoSystems/fix/shutdown
Browse files Browse the repository at this point in the history
Clean up shutdown
  • Loading branch information
jbearer authored Mar 11, 2024
2 parents ef7fd35 + a1a0fcb commit 39d7755
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 104 deletions.
2 changes: 1 addition & 1 deletion examples/simple-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn init_db() -> Db {

#[cfg(target_os = "windows")]
async fn init_db() -> Db {
Db::new().unwrap()
Db::with_prefix("simple-server-db").unwrap()
}

#[cfg(not(target_os = "windows"))]
Expand Down
9 changes: 5 additions & 4 deletions src/availability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ mod test {
use crate::{
data_source::{storage::no_storage, ExtensibleDataSource},
status::StatusDataSource,
task::BackgroundTask,
testing::{
consensus::{MockDataSource, MockNetwork, TestableDataSource},
mocks::{mock_transaction, MockHeader, MockPayload, MockTypes},
Expand All @@ -490,7 +491,7 @@ mod test {
types::HeightIndexed,
Error, Header,
};
use async_std::{sync::RwLock, task::spawn};
use async_std::sync::RwLock;
use commit::Committable;
use futures::future::FutureExt;
use hotshot_example_types::state_types::TestInstanceState;
Expand Down Expand Up @@ -792,7 +793,7 @@ mod test {
.unwrap(),
)
.unwrap();
spawn(app.serve(format!("0.0.0.0:{}", port)));
network.spawn("server", app.serve(format!("0.0.0.0:{}", port)));

// Start a client.
let client = Client::<Error>::new(
Expand Down Expand Up @@ -870,7 +871,7 @@ mod test {
async fn test_extensions() {
setup_test();

let dir = TempDir::new().unwrap();
let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
let mut data_source = ExtensibleDataSource::new(
MockDataSource::create(dir.path(), Default::default())
.await
Expand Down Expand Up @@ -929,7 +930,7 @@ mod test {
app.register_module("availability", api).unwrap();

let port = pick_unused_port().unwrap();
spawn(app.serve(format!("0.0.0.0:{}", port)));
let _server = BackgroundTask::spawn("server", app.serve(format!("0.0.0.0:{}", port)));

let client = Client::<Error>::new(
format!("http://localhost:{}/availability", port)
Expand Down
4 changes: 2 additions & 2 deletions src/data_source/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ mod impl_testable_data_source {
{
type Storage = TempDir;

async fn create(_node_id: usize) -> Self::Storage {
TempDir::new().unwrap()
async fn create(node_id: usize) -> Self::Storage {
TempDir::with_prefix(format!("file_system_data_source_{node_id}")).unwrap()
}

async fn connect(storage: &Self::Storage) -> Self {
Expand Down
6 changes: 3 additions & 3 deletions src/data_source/storage/ledger_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ mod test {
async fn test_ledger_log_creation() {
setup_test();

let dir = TempDir::new().unwrap();
let dir = TempDir::with_prefix("test_ledger_log").unwrap();

// Create and populuate a log.
{
Expand Down Expand Up @@ -307,7 +307,7 @@ mod test {
async fn test_ledger_log_insert() {
setup_test();

let dir = TempDir::new().unwrap();
let dir = TempDir::with_prefix("test_ledger_log").unwrap();
let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
let mut store = AtomicStore::open(loader).unwrap();
Expand Down Expand Up @@ -351,7 +351,7 @@ mod test {
async fn test_ledger_log_iter() {
setup_test();

let dir = TempDir::new().unwrap();
let dir = TempDir::with_prefix("test_ledger_log").unwrap();
let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
let mut store = AtomicStore::open(loader).unwrap();
Expand Down
5 changes: 2 additions & 3 deletions src/data_source/storage/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ pub mod testing {
},
Error,
};
use async_std::task::spawn;
use futures::stream::{BoxStream, StreamExt};
use hotshot::types::Event;
use portpicker::pick_unused_port;
Expand Down Expand Up @@ -290,7 +289,7 @@ pub mod testing {
}
}

async fn setup(network: &MockNetwork<Self>) {
async fn setup(network: &mut MockNetwork<Self>) {
// Spawn the web server on node 1 that node 0 will use to fetch missing data.
let Storage::NoStorage { fetch_from_port } = network.storage() else {
panic!("node 0 should always be NoStorage node");
Expand All @@ -300,7 +299,7 @@ pub mod testing {
let mut app = App::<_, Error>::with_state(api_data_source);
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{fetch_from_port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{fetch_from_port}")));
}

async fn handle_event(&mut self, event: &Event<MockTypes>) {
Expand Down
51 changes: 12 additions & 39 deletions src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,11 @@ use std::{
};

pub use anyhow::Error;
use async_std::{
net::ToSocketAddrs,
sync::Arc,
task::{sleep, spawn},
};
use async_std::{net::ToSocketAddrs, sync::Arc, task::sleep};
use async_trait::async_trait;
use chrono::Utc;
use commit::Committable;
use futures::{
channel::oneshot,
future::{select, Either, FutureExt},
stream::{BoxStream, StreamExt, TryStreamExt},
task::{Context, Poll},
AsyncRead, AsyncWrite,
Expand Down Expand Up @@ -74,6 +68,7 @@ use crate::{
VersionedDataSource,
},
node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
task::BackgroundTask,
types::HeightIndexed,
Header, Leaf, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult, VidShare,
};
Expand Down Expand Up @@ -339,8 +334,8 @@ impl Config {
pub struct SqlStorage {
client: Arc<Client>,
tx_in_progress: bool,
kill: Option<oneshot::Sender<()>>,
pruner_cfg: Option<PrunerCfg>,
_connection: BackgroundTask,
}

impl SqlStorage {
Expand All @@ -350,7 +345,7 @@ impl SqlStorage {
let tcp = TcpStream::connect((config.host.as_str(), config.port)).await?;

// Convert the TCP connection into a postgres connection.
let (mut client, kill) = if config.tls {
let (mut client, connection) = if config.tls {
let tls = TlsConnector::new(native_tls::TlsConnector::new()?, config.host.as_str());
connect(config.pgcfg, tcp, tls).await?
} else {
Expand Down Expand Up @@ -405,8 +400,8 @@ impl SqlStorage {
Ok(Self {
client: Arc::new(client),
tx_in_progress: false,
kill: Some(kill),
pruner_cfg: config.pruner_cfg,
_connection: connection,
})
}

Expand Down Expand Up @@ -611,15 +606,6 @@ impl Query for SqlStorage {
}
}

impl Drop for SqlStorage {
fn drop(&mut self) {
if let Some(kill) = self.kill.take() {
// Ignore errors, they just mean the task has already exited.
kill.send(()).ok();
}
}
}

#[async_trait]
impl VersionedDataSource for SqlStorage {
type Error = postgres::error::Error;
Expand Down Expand Up @@ -1766,35 +1752,22 @@ where

/// Connect to a Postgres database with a TLS implementation.
///
/// Spawns a background task to run the connection. Returns a client and a channel to kill the
/// connection task.
/// Spawns a background task to run the connection. Returns a client and a handle to the spawned
/// task.
async fn connect<T>(
pgcfg: postgres::Config,
tcp: TcpStream,
tls: T,
) -> anyhow::Result<(Client, oneshot::Sender<()>)>
) -> anyhow::Result<(Client, BackgroundTask)>
where
T: TlsConnect<TcpStream>,
T::Stream: Send + 'static,
{
let (client, connection) = pgcfg.connect_raw(tcp, tls).await?;

// Spawn a task to drive the connection, with a channel to kill it when this data source is
// dropped.
let (kill, killed) = oneshot::channel();
spawn(select(killed, connection).inspect(|res| {
if let Either::Right((res, _)) = res {
// If we were killed, do nothing. That is the normal shutdown path. But if the `select`
// returned because the `connection` terminated, we should log something, as that is
// unusual.
match res {
Ok(()) => tracing::warn!("postgres connection terminated unexpectedly"),
Err(err) => tracing::error!("postgres connection closed with error: {err}"),
}
}
}));

Ok((client, kill))
Ok((
client,
BackgroundTask::spawn("postgres connection", connection),
))
}

fn sql_param<T: ToSql + Sync>(param: &T) -> &(dyn ToSql + Sync) {
Expand Down
4 changes: 2 additions & 2 deletions src/fetching/provider/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ mod test {
availability::{define_api, AvailabilityDataSource, UpdateAvailabilityData},
data_source::{storage::sql::testing::TmpDb, VersionedDataSource},
fetching::provider::{NoFetching, QueryServiceProvider},
task::BackgroundTask,
testing::{
consensus::{MockDataSource, MockNetwork},
mocks::MockTypes,
Expand All @@ -210,7 +211,6 @@ mod test {
types::HeightIndexed,
Error,
};
use async_std::task::spawn;
use futures::stream::StreamExt;
use portpicker::pick_unused_port;
use tide_disco::App;
Expand All @@ -229,7 +229,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
let _server = BackgroundTask::spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down
15 changes: 7 additions & 8 deletions src/fetching/provider/query_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ mod test {
types::HeightIndexed,
VidCommitment,
};
use async_std::task::spawn;
use commit::Committable;
use futures::{
future::{join, FutureExt},
Expand Down Expand Up @@ -231,7 +230,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down Expand Up @@ -451,7 +450,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down Expand Up @@ -503,7 +502,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down Expand Up @@ -559,7 +558,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down Expand Up @@ -612,7 +611,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus, only from a peer.
let db = TmpDb::init().await;
Expand Down Expand Up @@ -666,7 +665,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus. We don't give it a
// fetcher since transactions are always fetched passively anyways.
Expand Down Expand Up @@ -734,7 +733,7 @@ mod test {
let mut app = App::<_, Error>::with_state(network.data_source());
app.register_module("availability", define_api(&Default::default()).unwrap())
.unwrap();
spawn(app.serve(format!("0.0.0.0:{port}")));
network.spawn("server", app.serve(format!("0.0.0.0:{port}")));

// Start a data source which is not receiving events from consensus.
let db = TmpDb::init().await;
Expand Down
14 changes: 6 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,7 @@ pub mod types;
pub use error::Error;
pub use resolvable::Resolvable;

use async_std::{
sync::{Arc, RwLock},
task::spawn,
};
use async_std::sync::{Arc, RwLock};
use futures::StreamExt;
use hotshot::types::SystemContextHandle;
use hotshot_types::traits::{
Expand All @@ -428,6 +425,7 @@ use hotshot_types::traits::{
};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use task::BackgroundTask;
use tide_disco::{App, StatusCode};

pub use hotshot_types::{
Expand Down Expand Up @@ -515,7 +513,7 @@ where

// Serve app.
let url = format!("0.0.0.0:{}", options.port);
spawn(async move { app.serve(&url).await });
let _server = BackgroundTask::spawn("server", async move { app.serve(&url).await });

// Subscribe to events before starting consensus, so we don't miss any events.
let mut events = hotshot.get_event_stream();
Expand Down Expand Up @@ -551,7 +549,7 @@ mod test {
mocks::{MockHeader, MockPayload, MockTypes},
},
};
use async_std::{sync::RwLock, task::spawn};
use async_std::sync::RwLock;
use async_trait::async_trait;
use atomic_store::{load_store::BincodeLoadStore, AtomicStore, AtomicStoreLoader, RollingLog};

Expand Down Expand Up @@ -697,7 +695,7 @@ mod test {

#[async_std::test]
async fn test_composition() {
let dir = TempDir::new().unwrap();
let dir = TempDir::with_prefix("test_composition").unwrap();
let mut loader = AtomicStoreLoader::create(dir.path(), "test_composition").unwrap();
let mut hotshot_qs = MockDataSource::create_with_store(&mut loader, Default::default())
.await
Expand Down Expand Up @@ -765,7 +763,7 @@ mod test {
.unwrap();

let port = pick_unused_port().unwrap();
spawn(app.serve(format!("0.0.0.0:{}", port)));
let _server = BackgroundTask::spawn("server", app.serve(format!("0.0.0.0:{}", port)));

let client = Client::<Error>::new(format!("http://localhost:{}", port).parse().unwrap());
assert!(client.connect(Some(Duration::from_secs(60))).await);
Expand Down
Loading

0 comments on commit 39d7755

Please sign in to comment.