Skip to content

Commit

Permalink
Log queries (#1534)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch authored Feb 10, 2025
1 parent adb7898 commit fcaf956
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 81 deletions.
25 changes: 16 additions & 9 deletions chronicle/src/shards/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::tss::{Tss, TssAction, VerifiableSecretSharingCommitment};
use super::tss::{Tss, TssAction, TssPeerId, VerifiableSecretSharingCommitment};
use crate::admin::AdminMsg;
use crate::network::{Message, Network, PeerId, TssMessage};
use crate::runtime::Runtime;
Expand Down Expand Up @@ -56,6 +56,13 @@ pub struct TimeWorker<Tx, Rx> {
admin_request: mpsc::Sender<AdminMsg>,
}

fn display_peer_id(peer_id: PeerId) -> String {
let Ok(peer_id) = TssPeerId::new(peer_id) else {
return hex::encode(peer_id);
};
peer_id.to_string()
}

impl<Tx, Rx> TimeWorker<Tx, Rx>
where
Tx: Network + Clone,
Expand Down Expand Up @@ -254,9 +261,9 @@ where
parent: &span,
Level::INFO,
shard_id,
"dropping message {} from {:?}",
from = display_peer_id(peer_id),
"dropping message {}",
msg,
peer_id,
);
continue;
};
Expand Down Expand Up @@ -335,9 +342,9 @@ where
parent: span,
Level::DEBUG,
shard_id = message.shard_id,
"tx {} to {:?}",
to = display_peer_id(peer_id),
"tx {}",
message.payload,
peer_id
);
let endpoint = self.network.clone();
self.outgoing_requests.push(Box::pin(async move {
Expand Down Expand Up @@ -449,9 +456,9 @@ where
Level::DEBUG,
shard_id,
block,
"rx {} from {:?}",
from = display_peer_id(peer),
"rx {}",
payload,
peer,
);
self.messages.entry(block).or_default().push((shard_id, peer, payload));
},
Expand All @@ -470,8 +477,8 @@ where
parent: span,
Level::INFO,
shard_id,
"tx to {:?} network error {:?}",
peer,
to = display_peer_id(peer),
"tx network error {:?}",
error,
);
}
Expand Down
6 changes: 5 additions & 1 deletion chronicle/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ impl TaskParams {
span: &Span,
) -> Result<bool> {
if target_block_height < task.start_block() {
tracing::debug!(parent: span,
tracing::debug!(
parent: span,
task_id,
task = task.to_string(),
target_block_height,
"task scheduled for future {:?}/{:?}",
target_block_height,
task.start_block(),
Expand Down
74 changes: 47 additions & 27 deletions tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::env::Mnemonics;
use crate::gas_price::{convert_bigint_to_u128, get_network_price};
use crate::table::IntoRow;
use anyhow::{Context, Result};
use futures::stream::{BoxStream, StreamExt};
use futures::stream::{BoxStream, FuturesUnordered, StreamExt};
use polkadot_sdk::sp_runtime::BoundedVec;
use scale_codec::{Decode, Encode};
use std::collections::hash_map::Entry;
Expand All @@ -28,7 +28,7 @@ mod loki;
mod slack;
mod table;

pub use crate::loki::Query;
pub use crate::loki::{Log, Query};
pub use crate::slack::{Sender, TableRef, TextRef};

async fn sleep_or_abort(duration: Duration) -> Result<()> {
Expand All @@ -50,34 +50,53 @@ pub struct Tc {

impl Tc {
pub async fn new(env: PathBuf, config: &str, msg: Sender) -> Result<Self> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");
dotenv::from_path(env.join(".env")).ok();
let config = Config::from_env(env, config)?;
let env = Mnemonics::from_env()?;
while let Err(err) = SubxtClient::get_client(&config.global().timechain_url).await {
tracing::info!("waiting for chain to start: {err:?}");
sleep_or_abort(Duration::from_secs(10)).await?;
}
let runtime =
SubxtClient::with_key(&config.global().timechain_url, &env.timechain_mnemonic).await?;
let mut connectors = HashMap::default();
for (id, network) in config.networks() {
let id = *id;
let params = ConnectorParams {
network_id: id,
blockchain: network.blockchain.clone(),
network: network.network.clone(),
url: network.url.clone(),
mnemonic: env.target_mnemonic.clone(),
cctp_sender: None,
cctp_attestation: None,
};
let connector = network
.backend
.connect_admin(&params)
let timechain_url = config.global().timechain_url.clone();
let runtime = tokio::task::spawn(async move {
while let Err(err) = SubxtClient::get_client(&timechain_url).await {
tracing::info!("waiting for chain to start: {err:?}");
sleep_or_abort(Duration::from_secs(10)).await?;
}
let runtime = SubxtClient::with_key(&timechain_url, &env.timechain_mnemonic)
.await
.context("failed to connect to backend")?;
connectors.insert(id, connector);
.context("failed to connect to timechain")?;
Ok::<_, anyhow::Error>(runtime)
});
let mut connectors = HashMap::new();
{
let mut connector_futures = FuturesUnordered::new();
for (id, network) in config.networks() {
let id = *id;
let params = ConnectorParams {
network_id: id,
blockchain: network.blockchain.clone(),
network: network.network.clone(),
url: network.url.clone(),
mnemonic: env.target_mnemonic.clone(),
cctp_sender: None,
cctp_attestation: None,
};
let connector = async move {
let connector = network
.backend
.connect_admin(&params)
.await
.with_context(|| format!("failed to connect to backend {id}"))?;
Ok::<_, anyhow::Error>((id, connector))
};
connector_futures.push(connector);
}
while let Some(res) = connector_futures.next().await {
let (network, connector) = res?;
connectors.insert(network, connector);
}
}
let runtime = runtime.await??;
Ok(Self {
config,
runtime,
Expand Down Expand Up @@ -1055,7 +1074,8 @@ impl Tc {
self.msg.text(id, line.into()).await
}

pub async fn log(&self, query: Query) -> Result<TextRef> {
self.msg.log(None, loki::logs(query).await?).await
pub async fn log(&self, query: Query) -> Result<TableRef> {
let logs = loki::logs(query).await?;
self.print_table(None, "logs", logs).await
}
}
160 changes: 142 additions & 18 deletions tc-cli/src/loki.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::env::Loki;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use time_primitives::{ShardId, TaskId};
use std::collections::HashMap;
use time_primitives::{BlockNumber, ShardId, TaskId};

//const DIRECTION_FORWARD: &'static str = "FORWARD";
//const DIRECTION_BACKWARD: &'static str = "BACKWARD";
Expand Down Expand Up @@ -34,39 +35,160 @@ struct StreamValue {

#[derive(Clone, Debug, clap::Parser)]
pub enum Query {
Task { task: TaskId },
Shard { shard: ShardId },
Raw { query: String },
Chronicle {
#[arg(long)]
task_id: Option<TaskId>,
#[arg(long)]
shard_id: Option<ShardId>,
#[arg(long)]
task: Option<String>,
#[arg(long)]
account: Option<String>,
#[arg(long)]
target_address: Option<String>,
#[arg(long)]
peer_id: Option<String>,
#[arg(long)]
block: Option<BlockNumber>,
#[arg(long)]
block_hash: Option<String>,
#[arg(long)]
target_block: Option<u64>,
#[arg(long)]
from: Option<String>,
#[arg(long)]
to: Option<String>,
},
Raw {
query: String,
},
}

impl std::fmt::Display for Query {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Task { task } => {
write!(f, r#"{{app="chronicle"}} |~ `task_id: {task}`"#)
},
Self::Shard { shard } => {
write!(f, r#"{{app="chronicle"}} |= `shard_id: {shard}`"#)
Self::Chronicle {
task_id,
shard_id,
task,
account,
target_address,
peer_id,
block,
block_hash,
target_block,
from,
to,
} => {
write!(f, r#"{{app="chronicle"}}"#)?;
if let Some(task) = task_id {
write!(f, " |= `task_id: {task}`")?;
}
if let Some(shard) = shard_id {
write!(f, " |= `shard_id: {shard}`")?;
}
if let Some(task) = task {
write!(f, " |= `task: {task}`")?;
}
if let Some(account) = account {
write!(f, " |= `timechain: {account}`")?;
}
if let Some(address) = target_address {
write!(f, " |= `target: {address}`")?;
}
if let Some(peer_id) = peer_id {
write!(f, " |= `peer_id: {peer_id}`")?;
}
if let Some(block) = block {
write!(f, " |= `block: {block}`")?;
}
if let Some(block_hash) = block_hash {
write!(f, " |= `block_hash: {block_hash}`")?;
}
if let Some(block) = target_block {
write!(f, " |= `target_block_height: {block}`")?;
}
if let Some(from) = from {
write!(f, " |= `from: {from}`")?;
}
if let Some(to) = to {
write!(f, " |= `to: {to}`")?;
}
Ok(())
},
Self::Raw { query } => f.write_str(query),
}
}
}

pub async fn logs(query: Query) -> Result<Vec<String>> {
#[derive(Debug)]
pub struct Log {
pub timestamp: String,
pub level: String,
pub msg: String,
pub location: String,
pub data: HashMap<String, String>,
}

impl std::str::FromStr for Log {
type Err = anyhow::Error;

fn from_str(log: &str) -> Result<Self> {
let mut data = HashMap::new();
let (timestamp, rest) = log.trim().split_once(' ').context("no timestamp")?;
let (level, rest) = rest.trim().split_once(' ').context("no level")?;
let (_module, rest) = rest.split_once(": ").context("no module")?;
let (mrest, rest) = rest.split_once(" at ").context("no data")?;
// Work around when logging raw byte arrays
let (part1, mrest) = mrest.split_once(']').unwrap_or(("", mrest));
let (part2, sdata) = mrest.split_once(',').unwrap_or((mrest, ""));
let msg = if part1.is_empty() { part2.to_string() } else { format!("{part1}]{part2}") };
for kv in sdata.split(',') {
let kv = kv.trim();
if kv.is_empty() {
continue;
}
let (k, v) = kv.split_once(':').context("no kv")?;
data.insert(k.trim().to_string(), v.trim().to_string());
}
let (location, rest) = rest.split_once(" ").unwrap_or((rest, ""));
for span in rest.split(" in ") {
let Some((_, sdata)) = span.split_once(" with ") else {
continue;
};
for kv in sdata.split(',') {
let kv = kv.trim();
if kv.is_empty() {
continue;
}
let (k, v) = kv.split_once(':').context("span no kv")?;
data.insert(k.trim().to_string(), v.trim().trim_matches('"').to_string());
}
}
let me = Self {
timestamp: timestamp.trim().into(),
level: level.trim().into(),
msg: msg.trim().into(),
location: location.trim().into(),
data,
};
Ok(me)
}
}

pub async fn logs(query: Query) -> Result<Vec<Log>> {
let query = query.to_string();
log::info!("{query}");
let env = Loki::from_env()?;
let client = reqwest::Client::new();
let url: reqwest::Url = format!("{}/loki/api/v1/query_range", &env.loki_url).parse()?;
let req = client
.get(url)
.basic_auth(env.loki_username, Some(env.loki_password))
.query(&Request {
query: query.to_string(),
since: "30d".into(),
})
.query(&Request { query, since: "30d".into() })
.build()
.context("invalid request")?;
log::info!("GET {}", req.url());
log::debug!("GET {}", req.url());
let resp = client.execute(req).await?;
let status = resp.status();
if status != 200 {
Expand All @@ -76,11 +198,13 @@ pub async fn logs(query: Query) -> Result<Vec<String>> {
let resp: Response = resp.json().await?;
anyhow::ensure!(resp.status == "success", "unexpected status");
anyhow::ensure!(resp.data.result_type == "streams", "unexpected result type");
Ok(resp

let logs = resp
.data
.result
.into_iter()
.flat_map(|v| v.values)
.flat_map(|(_, log)| log.split(" ").map(|s| s.to_string()).collect::<Vec<_>>())
.collect())
.map(|(_, log)| log.parse().unwrap())
.collect::<Vec<Log>>();
Ok(logs)
}
Loading

0 comments on commit fcaf956

Please sign in to comment.