Skip to content

Commit

Permalink
metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Aug 8, 2024
1 parent 6445a93 commit bd68cd9
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 39 deletions.
64 changes: 55 additions & 9 deletions serving/source-sink/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use crate::error::{Error, Result};
use crate::metrics::{
FORWARDER_ACK_LATENCY, FORWARDER_ACK_TOTAL, FORWARDER_LATENCY, FORWARDER_READ_BYTES_TOTAL,
FORWARDER_READ_LATENCY, FORWARDER_READ_TOTAL, FORWARDER_TRANSFORMER_LATENCY,
FORWARDER_WRITE_LATENCY, FORWARDER_WRITE_TOTAL, PARTITION_LABEL, PIPELINE_LABEL, REPLICA_LABEL,
VERTEX_LABEL, VERTEX_TYPE_LABEL,
};
use crate::sink::SinkClient;
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
use chrono::Utc;
use metrics::counter;
use metrics::{counter, histogram};
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tracing::{info, trace};

use crate::error::{Error, Result};
use crate::sink::SinkClient;
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
const SOURCER_SINKER_VERTEX_TYPE: &str = "sourcer-sinker";

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and the acknowledging the messages
Expand All @@ -19,24 +26,41 @@ pub(crate) struct Forwarder {
timeout_in_ms: u32,
batch_size: u64,
shutdown_rx: oneshot::Receiver<()>,
common_labels: Vec<(String, String)>,
}

impl Forwarder {
#[allow(clippy::too_many_arguments)] // we will use one vertex object to pass all the arguments
pub(crate) async fn new(
vertex_name: String,
pipeline_name: String,
replica: u32,
source_client: SourceClient,
sink_client: SinkClient,
transformer_client: Option<TransformerClient>,
timeout_in_ms: u32,
batch_size: u64,
shutdown_rx: oneshot::Receiver<()>,
) -> Result<Self> {
let common_labels = vec![
(VERTEX_LABEL.to_string(), vertex_name),
(PIPELINE_LABEL.to_string(), pipeline_name),
(
VERTEX_TYPE_LABEL.to_string(),
SOURCER_SINKER_VERTEX_TYPE.to_string(),
),
(REPLICA_LABEL.to_string(), replica.to_string()),
(PARTITION_LABEL.to_string(), "0".to_string()),
];

Ok(Self {
source_client,
sink_client,
transformer_client,
timeout_in_ms,
batch_size,
shutdown_rx,
common_labels,
})
}

Expand All @@ -47,6 +71,7 @@ impl Forwarder {
let mut messages_count: u64 = 0;
let mut last_forwarded_at = std::time::Instant::now();
loop {
let start_time = std::time::Instant::now();
// two arms, either shutdown or forward-a-chunk
tokio::select! {
_ = &mut self.shutdown_rx => {
Expand All @@ -56,14 +81,21 @@ impl Forwarder {
result = self.source_client.read_fn(self.batch_size, self.timeout_in_ms) => {
// Read messages from the source
let messages = result?;
info!("Read batch size: {}", messages.len());

histogram!(FORWARDER_READ_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);

messages_count += messages.len() as u64;
// INCOMING_REQUESTS.inc_by(messages.len() as i64);
counter!("data_read_total", "vertex" => "vertex1", "pipeline" => "pipeline1", "vertex_type" => "type1", "vertex_replica_index" => "index1", "partition_name" => "partition1").increment(messages_count);
let bytes_count = messages.iter().map(|msg| msg.value.len() as u64).sum::<u64>();
counter!(FORWARDER_READ_TOTAL, &self.common_labels).increment(messages_count);
counter!(FORWARDER_READ_BYTES_TOTAL, &self.common_labels).increment(bytes_count);

// Extract offsets from the messages
let offsets = messages.iter().map(|message| message.offset.clone()).collect();

// Apply transformation if transformer is present
let transformed_messages = if let Some(transformer_client) = &self.transformer_client {
let start_time = std::time::Instant::now();
let mut jh = JoinSet::new();
for message in messages {
let mut transformer_client = transformer_client.clone();
Expand All @@ -76,26 +108,35 @@ impl Forwarder {
let result = result?;
results.extend(result);
}
histogram!(FORWARDER_TRANSFORMER_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);
results
} else {
messages
};

let sink_write_start_time = std::time::Instant::now();
// Write messages to the sink
// TODO: should we retry writing? what if the error is transient?
// we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit.
// we need to confirm this via FMEA tests.
self.sink_client.sink_fn(transformed_messages).await?;
histogram!(FORWARDER_WRITE_LATENCY, &self.common_labels)
.record(sink_write_start_time.elapsed().as_millis() as f64);
counter!(FORWARDER_WRITE_TOTAL, &self.common_labels).increment(messages_count);

// Acknowledge the messages
// TODO: should we retry acking? what if the error is transient?
// we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit.
// we need to confirm this via FMEA tests.
let ack_start_time = std::time::Instant::now();
self.source_client.ack_fn(offsets).await?;
histogram!(FORWARDER_ACK_LATENCY, &self.common_labels)
.record(ack_start_time.elapsed().as_millis() as f64);
counter!(FORWARDER_ACK_TOTAL, &self.common_labels).increment(messages_count);
trace!("Forwarded {} messages", messages_count);
}
}
// TODO: print slow forwarder (make the time configurable)
// if the last forward was more than 1 second ago, forward a chunk print the number of messages forwarded
if last_forwarded_at.elapsed().as_millis() >= 1000 {
info!(
Expand All @@ -106,6 +147,8 @@ impl Forwarder {
messages_count = 0;
last_forwarded_at = std::time::Instant::now();
}
histogram!(FORWARDER_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);
}
Ok(())
}
Expand Down Expand Up @@ -336,6 +379,9 @@ mod tests {
.expect("failed to connect to transformer server");

let mut forwarder = Forwarder::new(
"test-vertex".to_string(),
"test-pipeline".to_string(),
0,
source_client,
sink_client,
Some(transformer_client),
Expand Down
36 changes: 25 additions & 11 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fs;
use std::time::Duration;
use std::{env, fs};
use tokio::signal;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -35,8 +35,8 @@ pub mod forwarder;
pub mod message;
pub(crate) mod shared;

const TIMEOUT_IN_MS: u32 = 1000;
const BATCH_SIZE: u64 = 500;
const TIMEOUT_IN_MS: &str = "1000";
const BATCH_SIZE: &str = "500";

/// forwards a chunk of data from the source to the sink via an optional transformer.
/// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing.
Expand All @@ -46,6 +46,11 @@ pub async fn run_forwarder(
transformer_config: Option<TransformerConfig>,
custom_shutdown_rx: Option<oneshot::Receiver<()>>,
) -> Result<()> {
// TODO: get this from vertex object, controller will have to pass this
let vertex_name = env::var("VERTEX_NAME").unwrap_or_else(|_| "vertex".to_string());
let pipeline_name = env::var("PIPELINE_NAME").unwrap_or_else(|_| "pipeline".to_string());
let replica = 0;

wait_for_server_info(&source_config.server_info_file).await?;
let mut source_client = SourceClient::connect(source_config).await?;

Expand All @@ -69,13 +74,26 @@ pub async fn run_forwarder(
)
.await?;

// TODO get these from the vertex object
let timeout_in_ms: u32 = env::var("TIMEOUT_IN_MS")
.unwrap_or_else(|_| TIMEOUT_IN_MS.to_string())
.parse()
.expect("Invalid TIMEOUT_IN_MS");
let batch_size: u64 = env::var("BATCH_SIZE")
.unwrap_or_else(|_| BATCH_SIZE.to_string())
.parse()
.expect("Invalid BATCH_SIZE");

// TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc?
let mut forwarder = Forwarder::new(
vertex_name,
pipeline_name,
replica,
source_client,
sink_client,
transformer_client,
TIMEOUT_IN_MS,
BATCH_SIZE,
timeout_in_ms,
batch_size,
shutdown_rx,
)
.await?;
Expand Down Expand Up @@ -266,12 +284,8 @@ mod tests {
// FIXME: we need to have a better way, this is flaky
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

unsafe {
env::set_var("SOURCE_SOCKET", src_sock_file.to_str().unwrap());
}
unsafe {
env::set_var("SINK_SOCKET", sink_sock_file.to_str().unwrap());
}
env::set_var("SOURCE_SOCKET", src_sock_file.to_str().unwrap());
env::set_var("SINK_SOCKET", sink_sock_file.to_str().unwrap());

let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

Expand Down
24 changes: 18 additions & 6 deletions serving/source-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
use std::env;
use std::net::SocketAddr;

use tracing::error;

use log::info;
use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
use sourcer_sinker::{metrics::start_metrics_server, run_forwarder};
use std::env;
use std::net::SocketAddr;
use tracing::error;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| "info".to_string());

// Initialize the logger
tracing_subscriber::fmt::init();
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.parse_lossy(log_level),
)
.with_target(false)
.init();

info!("Starting the forwarder");
// Start the metrics server, which server the prometheus metrics.
// TODO: make the port configurable.
let metrics_addr: SocketAddr = "0.0.0.0:9090".parse().expect("Invalid address");
Expand All @@ -31,6 +42,7 @@ async fn main() {
// TODO: Make these configurations configurable or we see them not changing?
let source_config = SourceConfig::default();
let sink_config = SinkConfig::default();
// TODO: We should decide transformer is enabled based on the mono vertex spec
let transformer_config = if env::var("NUMAFLOW_TRANSFORMER").is_ok() {
Some(TransformerConfig::default())
} else {
Expand Down
2 changes: 1 addition & 1 deletion serving/source-sink/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl From<Message> for SourceTransformRequest {
impl TryFrom<read_response::Result> for Message {
type Error = crate::Error;

fn try_from(result: read_response::Result) -> std::result::Result<Self, Self::Error> {
fn try_from(result: read_response::Result) -> Result<Self, Self::Error> {
let source_offset = match result.offset {
Some(o) => Offset {
offset: BASE64_STANDARD.encode(o.offset),
Expand Down
Loading

0 comments on commit bd68cd9

Please sign in to comment.