Skip to content

Commit

Permalink
Safe sync service
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Kordys committed Nov 6, 2022
1 parent 8dea489 commit 9a6d5fa
Show file tree
Hide file tree
Showing 11 changed files with 2,777 additions and 0 deletions.
1,867 changes: 1,867 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "safesync-server"
version = "0.1.0"
authors = ["Alex Kordys <[email protected]>"]
edition = "2021"

[dependencies]
anyhow = "1.0"
builder-pattern = "0.4"
env_logger = "0.9"
envy = "0.4"
futures = "0.3"
itertools = "0.10"
lazy_static = "1.4"
parking_lot = "0.12"
prometheus = "0.13"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
warp = "0.3"
wavesexchange_log = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_log/0.5.1" }
wavesexchange_warp = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_warp/0.14.3" }
85 changes: 85 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! Safe-sync server
extern crate wavesexchange_log as log;
extern crate wavesexchange_warp as wx_warp;

use std::sync::Arc;

use tokio::{
signal::unix::{signal, SignalKind},
sync::{mpsc, oneshot},
};

mod metrics;
mod server;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
env_logger::init();

// Load configs
let config = server::config::load()?;

// Create the web server
use server::builder::ServerBuilder;
let server = ServerBuilder::new()
.port(config.port)
.metrics_port(config.metrics_port)
.build()
.new_server();
let server = Arc::new(server);

// Run the web server
let (shutdown_signal_tx, mut shutdown_signal_rx) = mpsc::channel(1);
let (server_task, server_stop_tx) = server.clone().start(shutdown_signal_tx);
let server_handle = tokio::spawn(server_task);

// Graceful shutdown handling
let (shutdown_start_tx, shutdown_start_rx) = oneshot::channel();
let mut shutdown_start_tx = Some(shutdown_start_tx);
let mut graceful_shutdown_handle = tokio::spawn(async move {
if shutdown_start_rx.await.is_ok() {
log::debug!("Graceful shutdown started: disconnecting all clients");
server.disconnect_all_clients().await;
}
});

let mut sigterm_stream = signal(SignalKind::terminate()).expect("sigterm stream");

loop {
tokio::select! {
// On SIGTERM initiate graceful shutdown (subsequent SIGTERM will terminate server immediately)
_ = sigterm_stream.recv() => {
log::info!("got SIGTERM - shutting down gracefully");
if let Some(tx) = shutdown_start_tx.take() { // first SIGTERM
let _ = tx.send(()); // start graceful shutdown
} else { // subsequent SIGTERM
break; // terminate server immediately
}
}
// On SIGINT terminate server immediately
_ = tokio::signal::ctrl_c() => {
log::info!("got SIGINT - terminating immediately");
break; // terminate server
}
// When graceful shutdown handler finishes terminate the server
_ = &mut graceful_shutdown_handle => {
log::debug!("Graceful shutdown finished");
break; // terminate server
}
}
}

// Send stop signal to all websocket connection handlers
log::trace!("terminating ws connection handlers");
shutdown_signal_rx.close();

// Send stop signal to the web server
log::trace!("terminating ws server");
let _ = server_stop_tx.send(());
server_handle.await?;

log::info!("Server terminated");

Ok(())
}
11 changes: 11 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use lazy_static::lazy_static;
use prometheus::{Counter, IntGauge};

lazy_static! {
pub static ref ACTIVE_CLIENTS: IntGauge =
IntGauge::new("Active_Clients_Count", "Number of connected clients").expect("can't create Active_Clients_Count metric");
pub static ref CLIENT_CONNECT: Counter =
Counter::new("Client_Connected", "Client connect events").expect("can't create Client_Connected metric");
pub static ref CLIENT_DISCONNECT: Counter =
Counter::new("Client_Disconnected", "Client disconnect events").expect("can't create Client_Disconnected metric");
}
28 changes: 28 additions & 0 deletions src/server/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Safe-sync Web server instance builder.
use builder_pattern::Builder;

use super::{
websocket::{client::Clients, mailbox::MailboxManager},
Server,
};

#[derive(Builder)]
pub struct ServerBuilder {
#[public]
port: u16,

#[public]
metrics_port: u16,
}

impl ServerBuilder {
pub fn new_server(self) -> Server {
Server {
port: self.port,
metrics_port: self.metrics_port,
mailbox_manager: MailboxManager::default(),
clients: Clients::default(),
}
}
}
43 changes: 43 additions & 0 deletions src/server/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Safe-sync server configs.
use serde::Deserialize;

/// Safe-sync server application config
#[derive(Clone)]
pub struct ServiceConfig {
/// Server port
pub port: u16,

/// Metrics port
pub metrics_port: u16,
}

#[derive(Deserialize)]
struct RawConfig {
/// Server port
#[serde(default = "default_port")]
port: u16,

/// Metrics port
#[serde(default = "default_metrics_port")]
metrics_port: u16,
}

fn default_port() -> u16 {
8080
}

fn default_metrics_port() -> u16 {
8080
}

pub fn load() -> Result<ServiceConfig, anyhow::Error> {
let raw_config = envy::from_env::<RawConfig>()?;

let config = ServiceConfig {
port: raw_config.port,
metrics_port: raw_config.metrics_port,
};

Ok(config)
}
80 changes: 80 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! Safe-sync Web server.
use std::sync::Arc;

use futures::Future;
use tokio::sync::{mpsc, oneshot};
use warp::{ws, Filter};
use wx_warp::{log::access, MetricsWarpBuilder};

use self::websocket::{client::Clients, mailbox::MailboxManager};
use crate::metrics::{ACTIVE_CLIENTS, CLIENT_CONNECT, CLIENT_DISCONNECT};

pub mod builder;
pub mod config;
mod websocket;

/// The web server
pub struct Server {
port: u16,
metrics_port: u16,
mailbox_manager: MailboxManager,
clients: Clients,
}

impl Server
where
Self: Send + Sync + 'static,
{
/// Start the web server.
/// Returns the future that runs the web server and a sender that can be used to stop the server.
/// The shutdown signal is propagated to each connection handler to terminate them all.
pub fn start(self: Arc<Self>, shutdown_signal: mpsc::Sender<()>) -> (impl Future<Output = ()>, oneshot::Sender<()>) {
let port = self.port;
let metrics_port = self.metrics_port;
let with_self = { warp::any().map(move || self.clone()) };
let with_shutdown_signal = { warp::any().map(move || shutdown_signal.clone()) };

let ws = warp::path("ws")
.and(warp::path::end())
.and(warp::ws())
.and(with_self)
.and(with_shutdown_signal)
.map(|ws: ws::Ws, server: Arc<Self>, shutdown_signal| {
let mailbox_manager = server.mailbox_manager.clone();
let clients = server.clients.clone();
ws.on_upgrade(move |socket| websocket::connection::handle_connection(socket, mailbox_manager, clients, shutdown_signal))
})
.with(warp::log::custom(access));

// Signal to stop the server
let (stop_tx, stop_rx) = oneshot::channel();

let servers = MetricsWarpBuilder::new()
.with_main_routes(ws)
.with_main_routes_port(port)
.with_metrics_port(metrics_port)
.with_metric(&*ACTIVE_CLIENTS)
.with_metric(&*CLIENT_CONNECT)
.with_metric(&*CLIENT_DISCONNECT)
.with_graceful_shutdown(async {
let _ = stop_rx.await;
log::trace!("server shutdown signal received");
})
.run_async();

(servers, stop_tx)
}

/// Gracefully kill all connected websocket clients
pub async fn disconnect_all_clients(&self) {
let clients_to_kill = self.clients.all();
let client_count = clients_to_kill.len();
log::info!("About to kill {} connected clients", client_count);
for client in clients_to_kill {
log::trace!("Gracefully killing {:?}", client.id);
client.kill();
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}
}
94 changes: 94 additions & 0 deletions src/server/websocket/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Clients management
use std::{collections::HashMap, sync::Arc};

use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use warp::ws;

use super::mailbox::MailboxId;

/// Client ID, cheap to clone or copy.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct ClientId(u64);

/// Client struct, cheaply cloneable.
#[derive(Clone)]
pub struct Client {
pub id: ClientId,
inner: Arc<Mutex<ClientInner>>,
}

struct ClientInner {
sender: mpsc::UnboundedSender<ws::Message>,
kill_sender: Option<oneshot::Sender<()>>,
mailbox_id: Option<MailboxId>,
}

impl Client {
pub fn new(sender: mpsc::UnboundedSender<ws::Message>, kill_sender: oneshot::Sender<()>) -> Self {
let id = {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
ClientId(id)
};
let inner = Arc::new(Mutex::new(ClientInner {
sender,
kill_sender: Some(kill_sender),
mailbox_id: None,
}));
Client { id, inner }
}

pub fn mailbox_id(&self) -> Option<MailboxId> {
self.inner.lock().mailbox_id
}

pub fn set_mailbox_id(&self, mailbox_id: MailboxId) {
self.inner.lock().mailbox_id = Some(mailbox_id);
}

pub fn send_message(&self, msg: ws::Message) -> bool {
let res = self.inner.lock().sender.send(msg);
res.is_ok()
}

pub fn kill(&self) {
if let Some(tx) = self.inner.lock().kill_sender.take() {
let _ = tx.send(());
}
}
}

/// Client list, cheaply cloneable
#[derive(Clone, Default)]
pub struct Clients(Arc<Mutex<HashMap<ClientId, Client>>>);

impl Clients {
pub fn add(&self, client: Client) {
let Clients(clients) = self;
let mut clients = clients.lock();
debug_assert!(!clients.contains_key(&client.id));
clients.insert(client.id, client);
}

pub fn remove(&self, id: ClientId) {
let Clients(clients) = self;
let mut clients = clients.lock();
debug_assert!(clients.contains_key(&id));
clients.remove(&id);
}

pub fn find(&self, id: ClientId) -> Option<Client> {
let Clients(clients) = self;
let clients = clients.lock();
clients.get(&id).cloned()
}

pub fn all(&self) -> Vec<Client> {
let Clients(clients) = self;
let clients = clients.lock();
clients.values().cloned().collect()
}
}
Loading

0 comments on commit 9a6d5fa

Please sign in to comment.