Skip to content

Commit

Permalink
feat(cli): event listening
Browse files Browse the repository at this point in the history
Meshiest committed Dec 3, 2024
1 parent a800c5a commit b7ba176
Showing 14 changed files with 192 additions and 19 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -81,6 +81,7 @@ reqwest = { version = "0.12", default-features = false, features = [
] }
# Can't update this cause snarkos/vm
rocksdb = { version = "0.21", default-features = false }
rustls = { version = "0.23.15", features = ["ring"] }
semver = { version = "1.0", features = ["serde"] }
serde = { version = "1", default-features = false, features = [
"alloc",
2 changes: 1 addition & 1 deletion crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ lazysort.workspace = true
local-ip-address.workspace = true
nix = { workspace = true, features = ["signal"] }
reqwest = { workspace = true, features = ["json", "stream"] }
rustls.workspace = true
serde_json.workspace = true
sha2.workspace = true
simple_moving_average.workspace = true
@@ -46,4 +47,3 @@ tracing-appender.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
rustls = { version = "0.23.15", features = ["ring"] }
6 changes: 6 additions & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,12 @@ anyhow.workspace = true
clap.workspace = true
clap_complete.workspace = true
clap-stdin.workspace = true
futures-util.workspace = true
http.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
rustls.workspace = true
serde.workspace = true
serde_json.workspace = true
snops-common = { workspace = true, features = ["aot_cmds"] }
tokio = { workspace = true, features = ["macros", "signal", "rt-multi-thread"] }
tokio-tungstenite.workspace = true
26 changes: 24 additions & 2 deletions crates/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Result;
use clap::{CommandFactory, Parser};
use serde_json::Value;
use snops_common::events::EventFilter;

use crate::Cli;
use crate::{events::EventsClient, Cli};

/// The dummy value for the ids to hack around the missing required argument.
pub(crate) static DUMMY_ID: &str = "dummy_value___";
@@ -25,14 +26,21 @@ pub enum Commands {
SetLogLevel {
level: String,
},
/// Listen to events from the control plane, optionally filtered.
Events {
/// The event filter to apply, such as `agent-connected` or
/// `all-of(env-is(default),node-target-is(validator/any))`
filter: Option<EventFilter>,
},
#[cfg(feature = "mangen")]
Man(snops_common::mangen::Mangen),
#[cfg(feature = "clipages")]
Md(snops_common::clipages::Clipages),
}

impl Commands {
pub fn run(self, url: &str) -> Result<()> {
#[tokio::main]
pub async fn run(self, url: &str) -> Result<()> {
let client = reqwest::blocking::Client::new();

let response = match self {
@@ -49,6 +57,20 @@ impl Commands {
client.post(format!("{url}/api/v1/log/{level}")).send()?;
return Ok(());
}
Commands::Events { filter } => {
let mut client = EventsClient::open_with_filter(url, filter).await?;
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => break,
res = client.next() => {
let event = res?;
println!("{}", serde_json::to_string_pretty(&event)?);
}
}
}
client.close().await?;
return Ok(());
}
#[cfg(feature = "mangen")]
Commands::Man(mangen) => {
mangen.run(
124 changes: 124 additions & 0 deletions crates/cli/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// subscription code is not in use yet
#![allow(dead_code)]

use std::{collections::HashSet, str::FromStr, time::Duration};

use anyhow::{bail, Context, Result};
use futures_util::{SinkExt, StreamExt};
use http::Uri;
use snops_common::events::{Event, EventFilter, EventWsRequest};
use tokio::{net::TcpStream, select};
use tokio_tungstenite::{
connect_async,
tungstenite::{self, client::IntoClientRequest},
MaybeTlsStream, WebSocketStream,
};

pub struct EventsClient {
counter: u32,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
subscriptions: HashSet<u32>,
ping_interval: tokio::time::Interval,
}

impl EventsClient {
pub async fn open(url: &str) -> Result<Self> {
Self::open_with_filter(url, None).await
}

pub async fn open_with_filter(url: &str, filter: Option<EventFilter>) -> Result<Self> {
let (proto, hostname) = url.split_once("://").unwrap_or(("http", url));
let proto = match proto {
"wss" | "https" => "wss",
_ => "ws",
};

let req = Uri::from_str(&match filter {
Some(filter) => format!("{proto}://{hostname}/api/v1/events?filter={filter}"),
None => format!("{proto}://{hostname}/api/v1/events"),
})
.context("Invalid URI")?
.into_client_request()
.context("Invalid websocket request")?;

let stream = match connect_async(req).await {
Ok((stream, _)) => stream,
Err(tungstenite::Error::Io(e)) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
bail!("Failed to connect to websocket: Connection refused")
}
Err(e) => bail!("Failed to connect to websocket: {}", e),
};

Ok(Self {
counter: 0,
stream,
subscriptions: Default::default(),
ping_interval: tokio::time::interval(Duration::from_secs(10)),
})
}

async fn send_json(&mut self, msg: impl serde::Serialize) -> Result<()> {
self.stream
.send(tungstenite::Message::Text(
serde_json::to_string(&msg).context("Failed to serialize message")?,
))
.await
.context("Failed to send message")
}

/// Add an additional filter to the current subscription
pub async fn subscribe(&mut self, filter: EventFilter) -> Result<u32> {
let id = self.counter;
self.send_json(EventWsRequest::Subscribe { id, filter })
.await?;
self.counter = self.counter.saturating_add(1);
self.subscriptions.insert(id);
Ok(id)
}

/// Remove a filter from the current subscription
pub async fn unsubscribe(&mut self, id: u32) -> Result<()> {
if !self.subscriptions.remove(&id) {
bail!("Subscription not found: {}", id);
}
self.send_json(EventWsRequest::Unsubscribe { id }).await?;
Ok(())
}

/// Remove all filters from the current subscription
pub async fn unsubscribe_all(&mut self) -> Result<()> {
// Collect the ids to avoid borrowing issues
for id in self.subscriptions.drain().collect::<Vec<_>>() {
self.send_json(EventWsRequest::Unsubscribe { id }).await?;
}
Ok(())
}

/// Get the next event from the stream
pub async fn next(&mut self) -> Result<Event> {
loop {
select! {
_ = self.ping_interval.tick() => {
self.stream.send(tungstenite::Message::Ping(vec![b'p', b'i', b'n', b'g'])).await.context("Failed to send ping")?;
}
msg = self.stream.next() => {
match msg {
Some(Ok(tungstenite::Message::Text(text))) =>
return serde_json::from_str(&text).context("Failed to parse event"),
Some(Ok(tungstenite::Message::Binary(bin))) =>
return serde_json::from_slice(&bin).context("Failed to parse event"),
None | Some(Err(_)) => bail!("Websocket closed"),
Some(Ok(_)) => continue,

}
}
}
}
}

/// Close the websocket connection
pub async fn close(mut self) -> Result<()> {
self.stream.close(None).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod events;
6 changes: 6 additions & 0 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -6,10 +6,16 @@ use clap::Parser;
mod cli;
pub(crate) use cli::*;

mod events;

mod commands;
pub(crate) use commands::*;

fn main() -> Result<()> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");

let cli = cli::Cli::parse();

if let Err(err) = cli.run() {
2 changes: 1 addition & 1 deletion crates/common/src/events/filter.rs
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ impl Event {
self.transaction.as_ref() == Some(transaction)
}
EventFilter::CannonIs(cannon) => self.cannon == Some(*cannon),
EventFilter::EventIs(kind) => self.kind.filter() == *kind,
EventFilter::EventIs(kind) => self.content.filter() == *kind,
EventFilter::NodeKeyIs(node_key) => self.node_key.as_ref() == Some(node_key),
EventFilter::NodeTargetIs(node_targets) => self
.node_key
15 changes: 10 additions & 5 deletions crates/common/src/events/models.rs
Original file line number Diff line number Diff line change
@@ -23,12 +23,17 @@ pub enum EventWsRequest {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent: Option<AgentId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_key: Option<NodeKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<EnvId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction: Option<Arc<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cannon: Option<InternedId>,
pub kind: EventKind,
pub content: EventKind,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -206,27 +211,27 @@ impl Display for EventKindFilter {
}

impl Event {
pub fn new(kind: EventKind) -> Self {
pub fn new(content: EventKind) -> Self {
Self {
created_at: Utc::now(),
agent: None,
node_key: None,
env: None,
transaction: None,
cannon: None,
kind,
content,
}
}

pub fn replace_kind(&self, kind: impl Into<Event>) -> Self {
pub fn replace_content(&self, content: impl Into<Event>) -> Self {
Self {
created_at: Utc::now(),
agent: self.agent,
node_key: self.node_key.clone(),
env: self.env,
transaction: self.transaction.clone(),
cannon: self.cannon,
kind: kind.into().kind,
content: content.into().content,
}
}
}
6 changes: 3 additions & 3 deletions crates/common/src/events/test_filter.rs
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ fn test_all_of() {
env: Some(*B),
transaction: None,
cannon: None,
kind: Agent(Connected),
content: Agent(Connected),
};

assert!(e.matches(&(AgentConnected & AgentIs(*A))));
@@ -76,7 +76,7 @@ fn test_any_of() {
env: Some(*B),
transaction: None,
cannon: None,
kind: Agent(Connected),
content: Agent(Connected),
};

assert!(e.matches(&(AgentConnected | AgentIs(*A))));
@@ -107,7 +107,7 @@ fn test_one_of() {
env: Some(*B),
transaction: None,
cannon: None,
kind: Agent(Connected),
content: Agent(Connected),
};

assert!(e.matches(&(AgentConnected ^ AgentIs(*B))));
2 changes: 1 addition & 1 deletion crates/controlplane/src/server/actions/execute.rs
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ pub async fn execute_status(
return Err(ActionError::ExecuteStatusTimeout { tx_id: tx_id.to_string(), agent_id, retries });
},
Ok(ev) = rx.next() => {
let Event{ kind: EventKind::Transaction(ev), agent, .. } = ev.as_ref() else {
let Event{ content: EventKind::Transaction(ev), agent, .. } = ev.as_ref() else {
continue;
};

2 changes: 1 addition & 1 deletion crates/controlplane/src/server/rpc.rs
Original file line number Diff line number Diff line change
@@ -224,7 +224,7 @@ impl ControlService for ControlRpcServer {
let ev = AgentEvent::ReconcileComplete.with_agent(&agent);
let is_complete = status.as_ref().is_ok_and(|e| e.inner.is_some());

ev.replace_kind(match status {
ev.replace_content(match status {
Ok(res) => AgentEvent::Reconcile(res),
Err(err) => AgentEvent::ReconcileError(err),
})
12 changes: 7 additions & 5 deletions crates/controlplane/src/state/transactions.rs
Original file line number Diff line number Diff line change
@@ -142,7 +142,7 @@ fn get_pending_transactions(state: &GlobalState) -> Vec<((EnvId, CannonId), Pend
if cannon.sink.authorize_attempts.is_some_and(|a| attempts > a) {
info!("cannon {env_id}.{cannon_id} removed auth {tx_id} (too many attempts)");
to_remove.push(tx_id);
ev.replace_kind(TransactionEvent::ExecuteExceeded { attempts })
ev.replace_content(TransactionEvent::ExecuteExceeded { attempts })
.emit(state);
} else {
to_execute.push((tx_id, tx.index));
@@ -155,7 +155,7 @@ fn get_pending_transactions(state: &GlobalState) -> Vec<((EnvId, CannonId), Pend
{
if cannon.sink.authorize_attempts.is_some_and(|a| attempts > a) {
info!("cannon {env_id}.{cannon_id} removed auth {tx_id} (too many attempts)");
ev.replace_kind(TransactionEvent::ExecuteExceeded { attempts })
ev.replace_content(TransactionEvent::ExecuteExceeded { attempts })
.emit(state);
to_remove.push(tx_id);
} else {
@@ -166,7 +166,7 @@ fn get_pending_transactions(state: &GlobalState) -> Vec<((EnvId, CannonId), Pend
TransactionSendState::Unsent => {
if cannon.sink.broadcast_attempts.is_some_and(|a| attempts > a) {
info!("cannon {env_id}.{cannon_id} removed broadcast {tx_id} (too many attempts)");
ev.replace_kind(TransactionEvent::BroadcastExceeded { attempts })
ev.replace_content(TransactionEvent::BroadcastExceeded { attempts })
.emit(state);
to_remove.push(tx_id);
} else {
@@ -204,8 +204,10 @@ fn get_pending_transactions(state: &GlobalState) -> Vec<((EnvId, CannonId), Pend
{
if cannon.sink.broadcast_attempts.is_some_and(|a| attempts > a) {
info!("cannon {env_id}.{cannon_id} removed broadcast {tx_id} (too many attempts)");
ev.replace_kind(TransactionEvent::BroadcastExceeded { attempts })
.emit(state);
ev.replace_content(TransactionEvent::BroadcastExceeded {
attempts,
})
.emit(state);
to_remove.push(tx_id);
} else {
to_broadcast.push((tx_id, tx.index));

0 comments on commit b7ba176

Please sign in to comment.