Skip to content

Commit

Permalink
fix: set max decode size of proto message, add mvtx metrics (#2283)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Dec 16, 2024
1 parent c05c96a commit fc14696
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 83 deletions.
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config::monovertex::sink::SinkType;
use std::time::Duration;

use base64::prelude::BASE64_STANDARD;
Expand All @@ -14,6 +13,7 @@ use crate::config::components::transformer::{
};
use crate::config::components::{sink, source};
use crate::config::get_vertex_replica;
use crate::config::monovertex::sink::SinkType;
use crate::error::Error;
use crate::Result;

Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config::components::sink::SinkType;
use std::collections::HashMap;
use std::env;
use std::time::Duration;
Expand All @@ -10,6 +9,7 @@ use serde_json::from_slice;

use crate::config::components::metrics::MetricsConfig;
use crate::config::components::sink::SinkConfig;
use crate::config::components::sink::SinkType;
use crate::config::components::source::SourceConfig;
use crate::config::components::transformer::{TransformerConfig, TransformerType};
use crate::config::get_vertex_replica;
Expand Down
22 changes: 22 additions & 0 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const PENDING: &str = "pending";
// processing times as timers
const E2E_TIME: &str = "processing_time";
const READ_TIME: &str = "read_time";
const WRITE_TIME: &str = "write_time";
const TRANSFORM_TIME: &str = "time";
const ACK_TIME: &str = "ack_time";
const SINK_TIME: &str = "time";
Expand Down Expand Up @@ -226,9 +227,11 @@ pub(crate) struct PipelineForwarderMetrics {
pub(crate) ack_total: Family<Vec<(String, String)>, Counter>,
pub(crate) ack_time: Family<Vec<(String, String)>, Histogram>,
pub(crate) write_total: Family<Vec<(String, String)>, Counter>,
pub(crate) write_time: Family<Vec<(String, String)>, Histogram>,
pub(crate) read_bytes_total: Family<Vec<(String, String)>, Counter>,
pub(crate) processed_time: Family<Vec<(String, String)>, Histogram>,
pub(crate) pending: Family<Vec<(String, String)>, Gauge>,
pub(crate) dropped_total: Family<Vec<(String, String)>, Counter>,
}

pub(crate) struct PipelineISBMetrics {
Expand Down Expand Up @@ -395,6 +398,10 @@ impl PipelineMetrics {
}),
pending: Family::<Vec<(String, String)>, Gauge>::default(),
write_total: Family::<Vec<(String, String)>, Counter>::default(),
write_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
|| Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)),
),
dropped_total: Family::<Vec<(String, String)>, Counter>::default(),
},
isb: PipelineISBMetrics {
paf_resolution_time:
Expand Down Expand Up @@ -442,6 +449,21 @@ impl PipelineMetrics {
"Number of pending messages",
metrics.forwarder.pending.clone(),
);
forwarder_registry.register(
SINK_WRITE_TOTAL,
"Total number of Data Messages Written",
metrics.forwarder.write_total.clone(),
);
forwarder_registry.register(
DROPPED_TOTAL,
"Total number of dropped messages",
metrics.forwarder.dropped_total.clone(),
);
forwarder_registry.register(
WRITE_TIME,
"Time taken to write data",
metrics.forwarder.write_time.clone(),
);
metrics
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::pipeline::pipeline::isb::BufferReaderConfig;
use std::time::Duration;

use async_nats::jetstream::Context;
Expand All @@ -13,6 +12,7 @@ use crate::metrics::{PipelineContainerState, UserDefinedContainerState};
use crate::pipeline::forwarder::source_forwarder;
use crate::pipeline::isb::jetstream::reader::JetstreamReader;
use crate::pipeline::isb::jetstream::writer::JetstreamWriter;
use crate::pipeline::pipeline::isb::BufferReaderConfig;
use crate::shared::create_components;
use crate::shared::create_components::create_sink_writer;
use crate::shared::metrics::start_metrics_server;
Expand Down
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,15 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use super::*;
use crate::message::{Message, MessageID};
use async_nats::jetstream;
use async_nats::jetstream::{consumer, stream};
use bytes::BytesMut;
use chrono::Utc;
use tokio::time::sleep;

use super::*;
use crate::message::{Message, MessageID};

#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_jetstream_read() {
Expand Down
26 changes: 13 additions & 13 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use crate::config::pipeline::isb::BufferFullStrategy;
use crate::config::pipeline::ToVertexConfig;
use crate::error::Error;
use crate::message::{IntOffset, Message, Offset};
use crate::metrics::{pipeline_isb_metric_labels, pipeline_metrics};
use crate::pipeline::isb::jetstream::Stream;
use crate::tracker::TrackerHandle;
use crate::Result;

use crate::shared::forward;
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::context::PublishAckFuture;
use async_nats::jetstream::publish::PublishAck;
Expand All @@ -28,6 +18,16 @@ use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::config::pipeline::isb::BufferFullStrategy;
use crate::config::pipeline::ToVertexConfig;
use crate::error::Error;
use crate::message::{IntOffset, Message, Offset};
use crate::metrics::{pipeline_isb_metric_labels, pipeline_metrics};
use crate::pipeline::isb::jetstream::Stream;
use crate::shared::forward;
use crate::tracker::TrackerHandle;
use crate::Result;

const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1;

Expand Down Expand Up @@ -461,9 +461,6 @@ pub(crate) struct ResolveAndPublishResult {

#[cfg(test)]
mod tests {
use crate::pipeline::pipeline::isb::BufferWriterConfig;
use numaflow_models::models::ForwardConditions;
use numaflow_models::models::TagConditions;
use std::collections::HashMap;
use std::time::Instant;

Expand All @@ -472,9 +469,12 @@ mod tests {
use async_nats::jetstream::{consumer, stream};
use bytes::BytesMut;
use chrono::Utc;
use numaflow_models::models::ForwardConditions;
use numaflow_models::models::TagConditions;

use super::*;
use crate::message::{Message, MessageID, ReadAck};
use crate::pipeline::pipeline::isb::BufferWriterConfig;

#[cfg(feature = "nats-tests")]
#[tokio::test]
Expand Down
10 changes: 5 additions & 5 deletions rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub(crate) async fn create_sink_writer(
grpc::create_rpc_channel(ud_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(ud_config.grpc_max_message_size)
.max_encoding_message_size(ud_config.grpc_max_message_size);
.max_decoding_message_size(ud_config.grpc_max_message_size);
grpc::wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;
(
SinkWriterBuilder::new(
Expand Down Expand Up @@ -129,7 +129,7 @@ pub(crate) async fn create_sink_writer(
grpc::create_rpc_channel(ud_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(ud_config.grpc_max_message_size)
.max_encoding_message_size(ud_config.grpc_max_message_size);
.max_decoding_message_size(ud_config.grpc_max_message_size);
grpc::wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;

Ok((
Expand Down Expand Up @@ -178,7 +178,7 @@ pub async fn create_transformer(
grpc::create_rpc_channel(ud_transformer.socket_path.clone().into()).await?,
)
.max_encoding_message_size(ud_transformer.grpc_max_message_size)
.max_encoding_message_size(ud_transformer.grpc_max_message_size);
.max_decoding_message_size(ud_transformer.grpc_max_message_size);
grpc::wait_until_transformer_ready(&cln_token, &mut transformer_grpc_client).await?;
return Ok((
Some(
Expand Down Expand Up @@ -242,7 +242,7 @@ pub async fn create_source(
grpc::create_rpc_channel(udsource_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(udsource_config.grpc_max_message_size)
.max_encoding_message_size(udsource_config.grpc_max_message_size);
.max_decoding_message_size(udsource_config.grpc_max_message_size);
grpc::wait_until_source_ready(&cln_token, &mut source_grpc_client).await?;
let (ud_read, ud_ack, ud_lag) =
new_source(source_grpc_client.clone(), batch_size, read_timeout).await?;
Expand Down Expand Up @@ -329,7 +329,7 @@ mod tests {

#[tonic::async_trait]
impl sink::Sinker for InMemorySink {
async fn sink(&self, mut _input: mpsc::Receiver<sink::SinkRequest>) -> Vec<sink::Response> {
async fn sink(&self, _input: mpsc::Receiver<sink::SinkRequest>) -> Vec<sink::Response> {
vec![]
}
}
Expand Down
6 changes: 4 additions & 2 deletions rust/numaflow-core/src/shared/forward.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use numaflow_models::models::ForwardConditions;
use std::hash::{DefaultHasher, Hasher};
use std::sync::Arc;

use numaflow_models::models::ForwardConditions;

/// Checks if the message should to written to downstream vertex based the conditions
/// and message tags. If not tags are provided by there are edge conditions present, we will
/// still forward to all vertices.
Expand Down Expand Up @@ -61,9 +62,10 @@ fn check_operator_condition(

#[cfg(test)]
mod tests {
use super::*;
use numaflow_models::models::TagConditions;

use super::*;

#[tokio::test]
async fn test_evaluate_write_condition_no_conditions() {
let result = should_forward(None, None);
Expand Down
14 changes: 11 additions & 3 deletions rust/numaflow-core/src/shared/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken;
use tonic::transport::{Channel, Endpoint};
use tonic::Request;
use tower::service_fn;
use tracing::info;
use tracing::{info, warn};

use crate::error;
use crate::error::Error;
Expand Down Expand Up @@ -90,13 +90,21 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {

pub(crate) async fn create_rpc_channel(socket_path: PathBuf) -> error::Result<Channel> {
const RECONNECT_INTERVAL: u64 = 1000;
const MAX_RECONNECT_ATTEMPTS: usize = 5;
const MAX_RECONNECT_ATTEMPTS: usize = 60;

let interval = fixed::Interval::from_millis(RECONNECT_INTERVAL).take(MAX_RECONNECT_ATTEMPTS);

let channel = Retry::retry(
interval,
|| async { connect_with_uds(socket_path.clone()).await },
|| async {
match connect_with_uds(socket_path.clone()).await {
Ok(channel) => Ok(channel),
Err(e) => {
warn!(?e, "Failed to connect to UDS socket");
Err(Error::Connection(format!("Failed to connect: {:?}", e)))
}
}
},
|_: &Error| true,
)
.await?;
Expand Down
Loading

0 comments on commit fc14696

Please sign in to comment.