Skip to content

Commit

Permalink
remove todo, avoid clone
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jan 15, 2025
1 parent 9431405 commit fd44a6b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
1 change: 0 additions & 1 deletion rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ struct TimestampedPending {
#[derive(Clone)]
pub(crate) enum LagReader {
Source(Source),
// TODO: Arc<[T]>
ISB(Vec<JetstreamReader>), // multiple partitions
}

Expand Down
7 changes: 3 additions & 4 deletions rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@ pub(crate) mod store;

/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback
/// to inform that message has been processed by this component.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Callback {
pub(crate) id: String,
pub(crate) vertex: String,
pub(crate) cb_time: u64,
pub(crate) from_vertex: String,
/// Due to flat-map operation, we can have 0 or more responses.
// TODO: Arc<[T]>
pub(crate) responses: Vec<Response>,
}

/// It contains details about the `To` vertex via tags (conditional forwarding).
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Response {
/// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that
/// the message has been dropped.
/// the message has been dropped.
pub(crate) tags: Option<Vec<String>>,
}

Expand Down
8 changes: 4 additions & 4 deletions rust/serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,10 @@ mod tests {
let app = jetstream_proxy(app_state).await.unwrap();

tokio::spawn(async move {
let cbs = create_default_callbacks(ID_VALUE);
let mut retries = 0;
loop {
match callback_state.insert_callback_requests(cbs.clone()).await {
let cbs = create_default_callbacks(ID_VALUE);
match callback_state.insert_callback_requests(cbs).await {
Ok(_) => break,
Err(e) => {
retries += 1;
Expand Down Expand Up @@ -487,14 +487,14 @@ mod tests {
let app = jetstream_proxy(app_state).await.unwrap();

// pipeline is in -> cat -> out, so we will have 3 callback requests
let cbs = create_default_callbacks(ID_VALUE);

// spawn a tokio task which will insert the callback requests to the callback state
// if it fails, sleep for 10ms and retry
tokio::spawn(async move {
let mut retries = 0;
loop {
match callback_state.insert_callback_requests(cbs.clone()).await {
let cbs = create_default_callbacks(ID_VALUE);
match callback_state.insert_callback_requests(cbs).await {
Ok(_) => {
// save a test message, we should get this message when serve is invoked
// with foobar id
Expand Down

0 comments on commit fd44a6b

Please sign in to comment.