-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement Source trait and use it for user-defined source #2114
Changes from 4 commits
ec7c7c1
acb5ca4
3a84b0b
7380b0c
aa2d300
bdc5d41
854ecbe
7a65eb1
8ca2f72
1518a49
10eb7ba
5cf88a4
d99e1dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -645,4 +645,4 @@ mod tests { | |
let drop = OnFailureStrategy::Drop; | ||
assert_eq!(drop.to_string(), "drop"); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ use crate::error; | |
use crate::shared::utils; | ||
use crate::shared::utils::create_rpc_channel; | ||
use crate::sink::user_defined::SinkWriter; | ||
use crate::source::user_defined::Source; | ||
use crate::source::user_defined::new_source; | ||
use crate::transformer::user_defined::SourceTransformer; | ||
use forwarder::ForwarderBuilder; | ||
use metrics::MetricsState; | ||
|
@@ -145,6 +145,8 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) -> | |
) | ||
.await?; | ||
|
||
let (source_reader, lag_reader) = new_source(source_grpc_client.clone(), 500, 100).await?; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment about the numbers 500 and 100? use constants. |
||
// Start the metrics server in a separate background async spawn, | ||
// This should be running throughout the lifetime of the application, hence the handle is not | ||
// joined. | ||
|
@@ -160,11 +162,11 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) -> | |
utils::start_metrics_server(metrics_state).await; | ||
|
||
// start the lag reader to publish lag metrics | ||
let mut lag_reader = utils::create_lag_reader(source_grpc_client.clone()).await; | ||
let mut lag_reader = utils::create_lag_reader(lag_reader).await; | ||
lag_reader.start().await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we pass lag_reader inside create_lag_reader method, which is confusing |
||
|
||
// build the forwarder | ||
let source_reader = Source::new(source_grpc_client.clone()).await?; | ||
|
||
let sink_writer = SinkWriter::new(sink_grpc_client.clone()).await?; | ||
|
||
let mut forwarder_builder = ForwarderBuilder::new(source_reader, sink_writer, cln_token); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,14 +13,15 @@ use crate::monovertex::metrics; | |
use crate::monovertex::metrics::forward_metrics; | ||
use crate::monovertex::sink_pb::Status::{Failure, Fallback, Success}; | ||
use crate::sink::user_defined::SinkWriter; | ||
use crate::source::user_defined::Source; | ||
use crate::source::user_defined::UserDefinedSource; | ||
use crate::source::Source; | ||
use crate::transformer::user_defined::SourceTransformer; | ||
|
||
/// Forwarder is responsible for reading messages from the source, applying transformation if | ||
/// transformer is present, writing the messages to the sink, and then acknowledging the messages | ||
/// back to the source. | ||
pub(crate) struct Forwarder { | ||
source: Source, | ||
source: UserDefinedSource, | ||
sink_writer: SinkWriter, | ||
source_transformer: Option<SourceTransformer>, | ||
fb_sink_writer: Option<SinkWriter>, | ||
|
@@ -30,7 +31,7 @@ pub(crate) struct Forwarder { | |
|
||
/// ForwarderBuilder is used to build a Forwarder instance with optional fields. | ||
pub(crate) struct ForwarderBuilder { | ||
source: Source, | ||
source: UserDefinedSource, | ||
sink_writer: SinkWriter, | ||
cln_token: CancellationToken, | ||
source_transformer: Option<SourceTransformer>, | ||
|
@@ -40,7 +41,7 @@ pub(crate) struct ForwarderBuilder { | |
impl ForwarderBuilder { | ||
/// Create a new builder with mandatory fields | ||
pub(crate) fn new( | ||
source: Source, | ||
source: UserDefinedSource, | ||
sink_writer: SinkWriter, | ||
cln_token: CancellationToken, | ||
) -> Self { | ||
|
@@ -121,13 +122,9 @@ impl Forwarder { | |
/// and then acknowledge the messages back to the source. | ||
async fn read_and_process_messages(&mut self) -> error::Result<usize> { | ||
let start_time = tokio::time::Instant::now(); | ||
let messages = self | ||
.source | ||
.read(config().batch_size, config().timeout_in_ms) | ||
.await | ||
.map_err(|e| { | ||
Error::ForwarderError(format!("Failed to read messages from source {:?}", e)) | ||
})?; | ||
let messages = self.source.read().await.map_err(|e| { | ||
Error::ForwarderError(format!("Failed to read messages from source {:?}", e)) | ||
})?; | ||
|
||
debug!( | ||
"Read batch size: {} and latency - {}ms", | ||
|
@@ -555,7 +552,7 @@ mod tests { | |
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; | ||
use crate::shared::utils::create_rpc_channel; | ||
use crate::sink::user_defined::SinkWriter; | ||
use crate::source::user_defined::Source; | ||
use crate::source::user_defined::UserDefinedSource; | ||
use crate::transformer::user_defined::SourceTransformer; | ||
|
||
struct SimpleSource { | ||
|
@@ -735,9 +732,11 @@ mod tests { | |
|
||
let cln_token = CancellationToken::new(); | ||
|
||
let source = Source::new(SourceClient::new( | ||
create_rpc_channel(source_sock_file.clone()).await.unwrap(), | ||
)) | ||
let source = UserDefinedSource::new( | ||
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), | ||
500, | ||
100, | ||
) | ||
.await | ||
.expect("failed to connect to source server"); | ||
|
||
|
@@ -857,9 +856,11 @@ mod tests { | |
|
||
let cln_token = CancellationToken::new(); | ||
|
||
let source = Source::new(SourceClient::new( | ||
create_rpc_channel(source_sock_file.clone()).await.unwrap(), | ||
)) | ||
let source = UserDefinedSource::new( | ||
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), | ||
500, | ||
100, | ||
Comment on lines
+858
to
+859
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to make it a constant, since its used in multiple places There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we fetch it from the config? |
||
) | ||
.await | ||
.expect("failed to connect to source server"); | ||
|
||
|
@@ -971,9 +972,11 @@ mod tests { | |
|
||
let cln_token = CancellationToken::new(); | ||
|
||
let source = Source::new(SourceClient::new( | ||
create_rpc_channel(source_sock_file.clone()).await.unwrap(), | ||
)) | ||
let source = UserDefinedSource::new( | ||
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), | ||
500, | ||
100, | ||
) | ||
.await | ||
.expect("failed to connect to source server"); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ use crate::error::Error; | |
use crate::monovertex::sink_pb::sink_client::SinkClient; | ||
use crate::monovertex::source_pb::source_client::SourceClient; | ||
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; | ||
use crate::reader; | ||
use prometheus_client::encoding::text::encode; | ||
use prometheus_client::metrics::counter::Counter; | ||
use prometheus_client::metrics::family::Family; | ||
|
@@ -365,8 +366,8 @@ struct TimestampedPending { | |
/// `LagReader` is responsible for periodically checking the lag of the source client | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: please update comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. absolutely 😄 |
||
/// and exposing the metrics. It maintains a list of pending stats and ensures that | ||
/// only the most recent entries are kept. | ||
pub(crate) struct LagReader { | ||
source_client: SourceClient<Channel>, | ||
pub(crate) struct PendingReader<T> { | ||
lag_reader: T, | ||
Comment on lines
+369
to
+370
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a lag_reader inside PendingReader which is confusing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lag_reader is the trait. any recommendation? |
||
lag_checking_interval: Duration, | ||
refresh_interval: Duration, | ||
buildup_handle: Option<JoinHandle<()>>, | ||
|
@@ -375,16 +376,16 @@ pub(crate) struct LagReader { | |
} | ||
|
||
/// LagReaderBuilder is used to build a `LagReader` instance. | ||
pub(crate) struct LagReaderBuilder { | ||
source_client: SourceClient<Channel>, | ||
pub(crate) struct PendingReaderBuilder<T> { | ||
lag_reader: T, | ||
lag_checking_interval: Option<Duration>, | ||
refresh_interval: Option<Duration>, | ||
} | ||
|
||
impl LagReaderBuilder { | ||
pub(crate) fn new(source_client: SourceClient<Channel>) -> Self { | ||
impl<T: reader::LagReader> PendingReaderBuilder<T> { | ||
pub(crate) fn new(lag_reader: T) -> Self { | ||
Self { | ||
source_client, | ||
lag_reader, | ||
lag_checking_interval: None, | ||
refresh_interval: None, | ||
} | ||
|
@@ -400,9 +401,9 @@ impl LagReaderBuilder { | |
self | ||
} | ||
|
||
pub(crate) fn build(self) -> LagReader { | ||
LagReader { | ||
source_client: self.source_client, | ||
pub(crate) fn build(self) -> PendingReader<T> { | ||
PendingReader { | ||
lag_reader: self.lag_reader, | ||
lag_checking_interval: self | ||
.lag_checking_interval | ||
.unwrap_or_else(|| Duration::from_secs(3)), | ||
|
@@ -416,20 +417,20 @@ impl LagReaderBuilder { | |
} | ||
} | ||
|
||
impl LagReader { | ||
impl<T: reader::LagReader + Clone + Send + 'static> PendingReader<T> { | ||
/// Starts the lag reader by spawning tasks to build up pending info and expose pending metrics. | ||
/// | ||
/// This method spawns two asynchronous tasks: | ||
/// - One to periodically check the lag and update the pending stats. | ||
/// - Another to periodically expose the pending metrics. | ||
pub async fn start(&mut self) { | ||
let source_client = self.source_client.clone(); | ||
let pending_reader = self.lag_reader.clone(); | ||
let lag_checking_interval = self.lag_checking_interval; | ||
let refresh_interval = self.refresh_interval; | ||
let pending_stats = self.pending_stats.clone(); | ||
|
||
self.buildup_handle = Some(tokio::spawn(async move { | ||
build_pending_info(source_client, lag_checking_interval, pending_stats).await; | ||
build_pending_info(pending_reader, lag_checking_interval, pending_stats).await; | ||
})); | ||
|
||
let pending_stats = self.pending_stats.clone(); | ||
|
@@ -440,7 +441,7 @@ impl LagReader { | |
} | ||
|
||
/// When lag-reader is dropped, we need to clean up the pending exposer and the pending builder tasks. | ||
impl Drop for LagReader { | ||
impl<T> Drop for PendingReader<T> { | ||
fn drop(&mut self) { | ||
if let Some(handle) = self.expose_handle.take() { | ||
handle.abort(); | ||
|
@@ -454,15 +455,15 @@ impl Drop for LagReader { | |
} | ||
|
||
/// Periodically checks the pending messages from the source client and build the pending stats. | ||
async fn build_pending_info( | ||
mut source_client: SourceClient<Channel>, | ||
async fn build_pending_info<T: reader::LagReader>( | ||
mut lag_reader: T, | ||
lag_checking_interval: Duration, | ||
pending_stats: Arc<Mutex<Vec<TimestampedPending>>>, | ||
) { | ||
let mut ticker = time::interval(lag_checking_interval); | ||
loop { | ||
ticker.tick().await; | ||
match fetch_pending(&mut source_client).await { | ||
match fetch_pending(&mut lag_reader).await { | ||
Ok(pending) => { | ||
if pending != -1 { | ||
let mut stats = pending_stats.lock().await; | ||
|
@@ -484,14 +485,8 @@ async fn build_pending_info( | |
} | ||
} | ||
|
||
async fn fetch_pending(source_client: &mut SourceClient<Channel>) -> crate::error::Result<i64> { | ||
let request = Request::new(()); | ||
let response = source_client | ||
.pending_fn(request) | ||
.await? | ||
.into_inner() | ||
.result | ||
.map_or(-1, |r| r.count); // default to -1(unavailable) | ||
async fn fetch_pending<T: reader::LagReader>(lag_reader: &mut T) -> crate::error::Result<i64> { | ||
let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) | ||
Ok(response) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/// Lag reader reports the pending information at Reader (source, ISBs), this information is used by | ||
/// the auto-scaler. | ||
#[trait_variant::make(LagReader: Send)] | ||
#[allow(dead_code)] | ||
pub(crate) trait LocalLagReader { | ||
/// Pending elements yet to be processed at the source. It may or may not included unacknowledged | ||
/// messages. | ||
async fn pending(&mut self) -> crate::error::Result<Option<usize>>; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, I saw a lot of formatting changes in the PR, how did you generate those? asking because I ran
cargo fmt
in my last PR and it didn't generate your changes. is it because we are using different formatting scheme?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do "cargo fmt" right before committing.