Skip to content

Commit

Permalink
Update xcp-metrics/xcp-metrics-common
Browse files Browse the repository at this point in the history
Update dependencies
Simplify RPC routes logic.
Various other changes.

Signed-off-by: Teddy Astie <[email protected]>
  • Loading branch information
TSnake41 committed Jan 3, 2025
1 parent e24a492 commit 93d45de
Show file tree
Hide file tree
Showing 20 changed files with 420 additions and 537 deletions.
12 changes: 6 additions & 6 deletions xcp-metrics-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ rust-version = "1.70" # we need only 1.66 but our deps want 1.70
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
crc32fast = "1.3"
crc32fast = "1.4"
serde_json = "1.0"
anyhow = "1.0"
prost = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-types = "0.13"
maplit = "1.0"
json5 = "0.4.1"

Expand All @@ -22,19 +22,19 @@ version = "1.0"
features = ["std", "derive"]

[dependencies.uuid]
version = "1.4"
version = "1.11"
features = ["std", "serde", "v4", "fast-rng"]

[dependencies.indexmap]
version = "2.0"
version = "2.7"
features = ["serde"]

[dependencies.tokio]
version = "1"
features = ["io-util"]

[build-dependencies]
prost-build = "0.12"
prost-build = "0.13"

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion xcp-metrics-common/src/openmetrics/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl From<openmetrics::MetricSet> for MetricSet {
MetricFamily {
help: family.help.into(),
unit: family.unit.into(),
metric_type: openmetrics::MetricType::from_i32(family.r#type)
metric_type: openmetrics::MetricType::try_from(family.r#type)
.unwrap_or(openmetrics::MetricType::Unknown)
.into(),
metrics: family
Expand Down
13 changes: 4 additions & 9 deletions xcp-metrics-common/src/rrdd/rrd_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,11 @@ impl<'a> TryFrom<&'a RrdXport> for RrdXportJson<'a> {
}

impl RrdXport {
pub fn write_json<W: std::io::Write>(&self, writer: &mut W) -> anyhow::Result<()> {
serde_json::to_writer(writer, &RrdXportJson::try_from(self)?)?;

Ok(())
pub fn to_json(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string(&RrdXportJson::try_from(self)?)?)
}

pub fn write_json5<W: std::io::Write>(&self, writer: &mut W) -> anyhow::Result<()> {
let content = json5::to_string(&RrdXportJson::try_from(self)?)?;

write!(writer, "{content}")?;
Ok(())
pub fn to_json5(&self) -> anyhow::Result<String> {
Ok(json5::to_string(&RrdXportJson::try_from(self)?)?)
}
}
17 changes: 13 additions & 4 deletions xcp-metrics-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ xapi = { path = "../xapi-rs" }
anyhow = "1.0"
serde_json = "1.0"

http = "1.0"
http-body = "1.0"
http-body-util = "0.1"

tokio = { version = "1", features = ["full"] }

[dependencies.hyper]
version = "1.5"
features = ["http1", "server"]

[dependencies.hyper-util]
version = "0.1"
features = ["service", "http1", "tokio"]

[dependencies.clap]
version = "4.3"
version = "4.5"
features = ["derive"]

[dependencies.hyper]
version = "0.14"
features = ["full"]
57 changes: 18 additions & 39 deletions xcp-metrics-tools/src/bin/xcp-metrics-get-metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::path::PathBuf;

use clap::Parser;
use http_body_util::BodyExt;
use tokio::io::{stdout, AsyncWriteExt};
use xapi::{
hyper::{self, body, Body},
hyperlocal,
rpc::{
message::RpcKind, methods::OpenMetricsMethod, write_method_jsonrpc, write_method_xmlrpc,
},
};
use xapi::rpc::{message::RpcKind, methods::rrdd::OpenMetricsMethod};

/// Tool to get metrics from xcp-metrics in OpenMetrics format.
#[derive(Parser, Debug)]
Expand All @@ -32,46 +27,30 @@ async fn main() {
let args = Args::parse();
let daemon_path = args
.daemon_path
.unwrap_or_else(|| xapi::get_module_path("xcp-metrics"));
.unwrap_or_else(|| xapi::unix::get_module_path("xcp-metrics"));

let module_uri = hyperlocal::Uri::new(daemon_path, "/");

let mut rpc_buffer = vec![];
let method = OpenMetricsMethod {
protobuf: args.binary,
};

match args.rpc_format {
RpcKind::JsonRpc => write_method_jsonrpc(&mut rpc_buffer, &method).unwrap(),
RpcKind::XmlRpc => write_method_xmlrpc(&mut rpc_buffer, &method).unwrap(),
};

let content_type = match args.rpc_format {
RpcKind::JsonRpc => "application/json-rpc",
RpcKind::XmlRpc => "application/xml",
};

eprintln!("Sent: {}", String::from_utf8_lossy(&rpc_buffer));

let request = hyper::Request::builder()
.uri(hyper::Uri::from(module_uri))
.method("POST")
.header("User-agent", "xcp-metrics-get-metrics")
.header("content-length", rpc_buffer.len())
.header("content-type", content_type)
.header("host", "localhost")
.body(Body::from(rpc_buffer))
.unwrap();

let response = hyper::Client::builder()
.build(hyperlocal::UnixConnector)
.request(request)
.await;
let response = xapi::unix::send_rpc_to(
&daemon_path,
"POST",
&method,
"xcp-metrics-get-metrics",
RpcKind::JsonRpc,
)
.await;

eprintln!("{response:#?}");

let response = response.unwrap();
let data = body::to_bytes(response.into_body()).await.unwrap();
let data = response
.unwrap()
.into_body()
.collect()
.await
.unwrap()
.to_bytes();

stdout().write_all(&data).await.unwrap();
}
57 changes: 35 additions & 22 deletions xcp-metrics-tools/src/bin/xcp-metrics-openmetrics-proxy.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::{
convert::Infallible,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
};

use clap::{command, Parser};
use hyper::{
server::{conn::AddrStream, Server},
service::{make_service_fn, service_fn},
Body, Request, Response,
};

use xapi::rpc::methods::OpenMetricsMethod;
use http::Request;
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use xapi::rpc::{message::RpcKind, methods::rrdd::OpenMetricsMethod};

/// OpenMetrics http proxy, used to provide metrics for collectors such as Prometheus.
#[derive(Clone, Parser, Debug)]
Expand All @@ -26,38 +26,51 @@ struct Args {
}

async fn redirect_openmetrics(
request: Request<Body>,
request: Request<Incoming>,
daemon_path: &Path,
) -> anyhow::Result<Response<Body>> {
xapi::send_jsonrpc_to(
) -> Result<Response<Incoming>, Infallible> {
// TODO: Consider request supported OpenMetrics versions.
Ok(xapi::unix::send_rpc_to(
daemon_path,
"POST",
&OpenMetricsMethod::default(),
&OpenMetricsMethod { protobuf: false },
"xcp-metrics-openmetrics-proxy",
RpcKind::JsonRpc,
)
.await
.expect("RPC failure"))
}

#[tokio::main]
async fn main() {
let args = Args::parse();
let daemon_path = args
.daemon_path
.unwrap_or_else(|| xapi::get_module_path("xcp-metrics"));
.unwrap_or_else(|| xapi::unix::get_module_path("xcp-metrics"));

let listener = TcpListener::bind(args.addr)
.await
.expect("Unable to bind socket");

let service_fn = make_service_fn(|addr: &AddrStream| {
println!("Handling request {:?}", addr);
// We start a loop to continuously accept incoming connections
loop {
let daemon_path = daemon_path.clone();
let (stream, addr) = listener.accept().await.expect("Unable to accept socket");

async {
anyhow::Ok(service_fn(move |request| {
let daemon_path = daemon_path.clone();
async move { redirect_openmetrics(request, &daemon_path).await }
}))
}
});
println!("Handling request {addr:?}");

let server = Server::bind(&args.addr).serve(service_fn);
let io = TokioIo::new(stream);

server.await.expect("Proxy server failure");
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(|request| redirect_openmetrics(request, &daemon_path)),
)
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
20 changes: 16 additions & 4 deletions xcp-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,31 @@ edition = "2021"

[dependencies]
xcp-metrics-common = { path = "../xcp-metrics-common" }
xapi = { path = "../xapi-rs" }
xapi = { path = "../xapi-rs", features = ["unix"] }

http = "1.1"
http-body = "1.0"
http-body-util = "0.1"

serde_json = "1.0"
anyhow = "1.0"
futures = "0.3"
dashmap = "5.4.0"
dashmap = "6.1"
tracing = "0.1"
tracing-subscriber = "0.3"
maplit = "1.0.2"

[dependencies.hyper]
version = "1.5"
features = ["http1", "server"]

[dependencies.hyper-util]
version = "0.1"
features = ["service", "http1", "tokio"]

[dependencies.tokio]
version = "1"
features = ["full"]
features = ["rt", "rt-multi-thread", "net", "fs", "macros"]

[dependencies.serde]
version = "1.0"
Expand All @@ -36,5 +48,5 @@ version = "0.4.3"
features = ["tokio"]

[dependencies.clap]
version = "4.3"
version = "4.5"
features = ["derive"]
12 changes: 6 additions & 6 deletions xcp-metrics/src/forwarded/request.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! [ForwardedRequest] implementation.
use std::{collections::HashMap, str::FromStr};

use serde::Deserialize;
use xapi::hyper::{
use hyper::{
header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE, HOST, TRANSFER_ENCODING, USER_AGENT},
http::uri::PathAndQuery,
Body, Request, Version,
Request, Version,
};
use serde::Deserialize;

/// xapi-project/xen-api/blob/master/ocaml/libs/http-lib/http.ml for reference
#[derive(Clone, Debug, Deserialize)]
Expand All @@ -32,7 +32,7 @@ pub struct ForwardedRequest {
pub traceparent: Option<Box<str>>,
}

impl TryFrom<ForwardedRequest> for Request<Body> {
impl TryFrom<ForwardedRequest> for Request<String> {
type Error = anyhow::Error;

fn try_from(request: ForwardedRequest) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -82,8 +82,8 @@ impl TryFrom<ForwardedRequest> for Request<Body> {
}

Ok(builder.body(match request.body {
Some(content) => Body::from(content.as_bytes().to_vec()),
None => Body::empty(),
Some(content) => content.into_string(),
None => String::new(),
})?)
}
}
30 changes: 16 additions & 14 deletions xcp-metrics/src/forwarded/response.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! [write_response] implementation
use std::{fmt::Debug, io::Write};

use xapi::hyper::{body, http::Response, Body};
use http_body_util::{BodyExt, Full};
use hyper::{body::Bytes, http::Response};

/// Write the HTTP response into some writer.
pub async fn write_response<W>(
writer: &mut W,
mut response: Response<Body>,
response: Response<Full<Bytes>>,
) -> Result<(), anyhow::Error>
where
W: Write + Debug,
Expand All @@ -20,23 +21,24 @@ where
response.status().canonical_reason().unwrap_or_default()
)?;

let body = body::to_bytes(response.body_mut()).await?;
let (mut parts, body) = response.into_parts();
let body = body.collect().await?.to_bytes();

// Add content-length if not defined
if !response.headers().contains_key("content-length") {
if !parts.headers.contains_key("content-length") {
let body_length = body.len();
response
.headers_mut()
.insert("content-length", body_length.into());
parts.headers.insert("content-length", body_length.into());
}

for (name, value) in response.headers() {
write!(
writer,
"{}: {}\r\n",
name.as_str(),
String::from_utf8_lossy(value.as_bytes())
)?;
for (name, value) in parts.headers {
if let Some(name) = name {
write!(
writer,
"{}: {}\r\n",
name.as_str(),
String::from_utf8_lossy(value.as_bytes())
)?;
}
}

write!(writer, "\r\n")?;
Expand Down
Loading

0 comments on commit 93d45de

Please sign in to comment.