Skip to content

Commit

Permalink
feat(snot): basic test preparation with storage prep, agent delegatio…
Browse files Browse the repository at this point in the history
…n, reconciliation (untested)

Signed-off-by: Zander Franks <[email protected]>
  • Loading branch information
voximity committed Mar 22, 2024
1 parent a711d25 commit f78b908
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 53 deletions.
7 changes: 7 additions & 0 deletions crates/aot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub mod ledger;
#[cfg(feature = "node")]
pub mod runner;

use rand::SeedableRng;
use rand_chacha::ChaChaRng;
use snarkvm::{
console::network::MainnetV0,
ledger::{
Expand Down Expand Up @@ -59,3 +61,8 @@ pub type Literal = snarkvm::console::program::Literal<Network>;
pub type Authorization = snarkvm::synthesizer::Authorization<Network>;
pub type Block = snarkvm::ledger::Block<Network>;
pub type Committee = snarkvm::ledger::committee::Committee<Network>;

pub fn gen_private_key() -> anyhow::Result<PrivateKey> {
let mut rng = ChaChaRng::from_entropy();
PrivateKey::new(&mut rng)
}
23 changes: 23 additions & 0 deletions crates/snot-agent/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::path::Path;

use futures::StreamExt;
use http::StatusCode;
use reqwest::IntoUrl;
use tokio::{fs::File, io::AsyncWriteExt};

/// Download a file. Returns a None if 404.
pub async fn download_file(url: impl IntoUrl, to: impl AsRef<Path>) -> anyhow::Result<Option<()>> {
let req = reqwest::get(url).await?;
if req.status() == StatusCode::NOT_FOUND {
return Ok(None);
}

let mut stream = req.bytes_stream();
let mut file = File::create(to).await?;

while let Some(chunk) = stream.next().await {
file.write_all(&chunk?).await?;
}

Ok(Some(()))
}
1 change: 1 addition & 0 deletions crates/snot-agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod api;
mod cli;
mod rpc;
mod state;
Expand Down
76 changes: 39 additions & 37 deletions crates/snot-agent/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{ops::Deref, process::Stdio, sync::Arc};

use futures::StreamExt;
use snot_common::{
rpc::{
agent::{AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError},
Expand All @@ -11,12 +10,12 @@ use snot_common::{
};
use tarpc::{context, ClientMessage, Response};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
io::{AsyncBufReadExt, BufReader},
process::Command,
};
use tracing::{debug, info, warn, Level};

use crate::state::AppState;
use crate::{api, state::AppState};

/// The JWT file name.
pub const JWT_FILE: &str = "jwt";
Expand All @@ -33,7 +32,7 @@ pub const SNARKOS_LEDGER_DIR: &str = "ledger";
/// The base ledger directory name.
pub const SNARKOS_LEDGER_BASE_DIR: &str = "ledger.base";
/// Temporary storage archive file name.
pub const TEMP_STORAGE_FILE: &str = "storage.tar.gz";
pub const LEDGER_STORAGE_FILE: &str = "ledger.tar.gz";

/// A multiplexed message, incoming on the websocket.
pub type MuxedMessageIncoming =
Expand Down Expand Up @@ -139,50 +138,53 @@ impl AgentService for AgentRpcServer {
break 'storage;
};

// open a file for writing the archive
let mut file = tokio::fs::File::create(base_path.join(TEMP_STORAGE_FILE))
let genesis_url = format!(
"http://{}/api/storage/{storage_id}/genesis",
&state.endpoint
);

let ledger_url =
format!("http://{}/api/storage/{storage_id}/ledger", &state.endpoint);

// download the genesis block
api::download_file(genesis_url, base_path.join(SNARKOS_GENESIS_FILE))
.await
.map_err(|_| ReconcileError::StorageAcquireError)?;

// stream the archive containing the storage
// use /api/storage here instead of /content/storage so that the control plane
// can redirect our integer storage_id into an actual string storage_id
let mut stream = reqwest::get(format!(
"http://{}/api/storage/{storage_id}",
&state.endpoint
))
.await
.map_err(|_| ReconcileError::StorageAcquireError)?
.bytes_stream();

// write the streamed archive to the file
while let Some(chunk) = stream.next().await {
file.write_all(&chunk.map_err(|_| ReconcileError::StorageAcquireError)?)
// download the ledger
let mut fail = false;

if let Ok(Some(())) =
api::download_file(ledger_url, base_path.join(LEDGER_STORAGE_FILE))
.await
.map_err(|_| ReconcileError::StorageAcquireError)
{
// TODO: remove existing ledger probably

// use `tar` to decompress the storage
let mut tar_child = Command::new("tar")
.current_dir(&base_path)
.arg("-xzf")
.arg(LEDGER_STORAGE_FILE)
.kill_on_drop(true)
.spawn()
.map_err(|_| ReconcileError::StorageAcquireError)?;
}

let _ = (file, stream);

// use `tar` to decompress the storage
let mut tar_child = Command::new("tar")
.current_dir(&base_path)
.arg("-xzf")
.arg(TEMP_STORAGE_FILE)
.kill_on_drop(true)
.spawn()
.map_err(|_| ReconcileError::StorageAcquireError)?;
let status = tar_child
.wait()
.await
.map_err(|_| ReconcileError::StorageAcquireError)?;

let status = tar_child
.wait()
.await
.map_err(|_| ReconcileError::StorageAcquireError)?;
if !status.success() {
fail = true;
}
}

// unconditionally remove the tar regardless of success
let _ = tokio::fs::remove_file(base_path.join(TEMP_STORAGE_FILE)).await;
let _ = tokio::fs::remove_file(base_path.join(LEDGER_STORAGE_FILE)).await;

// return an error if the storage acquisition failed
if !status.success() {
if fail {
return Err(ReconcileError::StorageAcquireError);
}
}
Expand Down
28 changes: 27 additions & 1 deletion crates/snot-common/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{net::SocketAddr, str::FromStr};
use std::{
fmt::{Display, Write},
net::SocketAddr,
str::FromStr,
};

use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -190,6 +194,16 @@ impl NodeType {
}
}

impl Display for NodeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Client => f.write_str("client"),
Self::Validator => f.write_str("validator"),
Self::Prover => f.write_str("prover"),
}
}
}

impl FromStr for NodeType {
type Err = &'static str;

Expand Down Expand Up @@ -247,3 +261,15 @@ impl<'de> Deserialize<'de> for NodeKey {
Self::from_str(s).map_err(D::Error::custom)
}
}

impl Display for NodeKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.ty, self.id)?;
if let Some(ns) = &self.ns {
f.write_char('@')?;
f.write_str(&ns)?;
}

Ok(())
}
}
26 changes: 26 additions & 0 deletions crates/snot/src/schema/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::net::SocketAddr;

use indexmap::IndexMap;
use serde::Deserialize;
use snarkos_aot::gen_private_key;
use snot_common::state::{HeightRequest, NodeState, NodeType};

use super::{NodeKey, NodeTargets};

Expand Down Expand Up @@ -35,6 +37,8 @@ pub struct Node {
/// The private key to start the node with. When unspecified, a random
/// private key is generated at runtime.
pub key: Option<String>,
/// The storage ID to use when starting the node.
pub storage: String,
/// Height of ledger to inherit.
///
/// * When null, a ledger is created when the node is started.
Expand All @@ -47,3 +51,25 @@ pub struct Node {
#[serde(default)]
pub peers: NodeTargets,
}

impl Node {
pub fn into_state(&self, ty: NodeType) -> NodeState {
NodeState {
ty,
private_key: self
.key
.clone()
.unwrap_or_else(|| gen_private_key().unwrap().to_string()),

// TODO
height: (0, HeightRequest::Top),

// TODO: should this be online?
online: true,
// TODO: resolve validators
validators: vec![],
// TODO: resolve peers
peers: vec![],
}
}
}
32 changes: 31 additions & 1 deletion crates/snot/src/schema/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::ensure;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize};
use tokio::process::Command;
use tracing::warn;

use crate::state::GlobalState;
Expand All @@ -16,6 +17,8 @@ pub struct Document {
pub id: FilenameString,
pub name: String,
pub description: Option<String>,
/// Prefer using existing storage instead of generating new stuff.
pub prefer_existing: bool,
pub generate: Option<StorageGeneration>,
}

Expand Down Expand Up @@ -114,6 +117,8 @@ impl Document {

let exists = matches!(tokio::fs::try_exists(&base).await, Ok(true));

// TODO: respect self.prefer_existing

match self.generate {
// generate the block and ledger if we have generation params
Some(mut generation) => 'generate: {
Expand All @@ -126,13 +131,15 @@ impl Document {

generation.genesis = snarkos_aot::genesis::Genesis {
output: base.join("genesis.block"),
ledger: Some(base.join("ledger")),
ledger: None,
additional_accounts_output: Some(base.join("accounts.json")),
committee_output: Some(base.join("committee.json")),
..generation.genesis
};

tokio::task::spawn_blocking(move || generation.genesis.parse()).await??;

// TODO: transactions
}

// no generation params passed
Expand All @@ -142,6 +149,29 @@ impl Document {
}
}

// tar the ledger so that it can be served to agents
// the genesis block is not compressed because it is already binary and might
// not be served independently
let ledger_exists = matches!(tokio::fs::try_exists(base.join("ledger")).await, Ok(true));
let ledger_tar_exists = matches!(
tokio::fs::try_exists(base.join("ledger.tar.gz")).await,
Ok(true)
);

if ledger_exists && !ledger_tar_exists {
let mut child = Command::new("tar")
.current_dir(&base)
.arg("-czf")
.arg("ledger.tar.gz")
.arg("ledger/")
.kill_on_drop(true)
.spawn()?;

if !child.wait().await.map(|s| s.success()).unwrap_or(false) {
warn!("failed to compress ledger");
}
}

// add the prepared storage to the storage map
let mut storage_lock = state.storage.write().await;
let int_id = STORAGE_ID_INT.fetch_add(1, Ordering::Relaxed);
Expand Down
18 changes: 15 additions & 3 deletions crates/snot/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,38 @@ use axum::{
routing::get,
Json, Router,
};
use serde::Deserialize;
use serde_json::json;

use super::AppState;

pub(super) fn routes() -> Router<AppState> {
Router::new()
.route("/storage/:id", get(redirect_storage))
.route("/storage/:id/:ty", get(redirect_storage))
.route("/agents", get(get_agents))
// .route("/test", post(post_test))
}

#[derive(Deserialize)]
enum StorageType {
Genesis,
Ledger,
}

async fn redirect_storage(
Path(storage_id): Path<usize>,
Path((storage_id, ty)): Path<(usize, StorageType)>,
State(state): State<AppState>,
) -> Response {
let Some(real_id) = state.storage.read().await.get_by_left(&storage_id).cloned() else {
return StatusCode::NOT_FOUND.into_response();
};

Redirect::temporary(&format!("/content/storage/{real_id}.tar.gz")).into_response()
let filename = match ty {
StorageType::Genesis => "genesis.block",
StorageType::Ledger => "ledger.tar.gz",
};

Redirect::temporary(&format!("/content/storage/{real_id}/{filename}")).into_response()
}

async fn get_agents(State(state): State<AppState>) -> impl IntoResponse {
Expand Down
Loading

0 comments on commit f78b908

Please sign in to comment.