From 2b211587fc43e009b40714524493f9f12d19bd7d Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 3 Feb 2025 23:25:01 +0100 Subject: [PATCH 1/3] AdvancedSub starts delivering samples as soon as sample_depth samples received --- zenoh-ext/src/advanced_subscriber.rs | 38 +++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 8b8a67e2c1..3ea1c5f103 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -421,6 +421,7 @@ struct State { key_expr: KeyExpr<'static>, retransmission: bool, period: Option, + history_depth: usize, query_target: QueryTarget, query_timeout: Duration, callback: Callback, @@ -502,8 +503,25 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { pending_queries: 0, pending_samples: BTreeMap::new(), }); - if states.global_pending_queries != 0 { - state.pending_samples.insert(source_sn, sample); + if state.last_delivered.is_none() && states.global_pending_queries != 0 { + // Avoid going through the Map if history_depth == 1 + if states.history_depth == 1 { + state.last_delivered = Some(source_sn); + states.callback.call(sample); + } else { + state.pending_samples.insert(source_sn, sample); + if state.pending_samples.len() >= states.history_depth { + if let Some((mut last_seq_num, sample)) = state.pending_samples.pop_first() { + states.callback.call(sample); + state.last_delivered = Some(last_seq_num); + while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { + states.callback.call(s); + last_seq_num += 1; + state.last_delivered = Some(last_seq_num); + } + } + } + } } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 { if source_sn > state.last_delivered.unwrap() { if states.retransmission { @@ -547,7 +565,16 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { state.last_delivered = Some(*timestamp); states.callback.call(sample); } else { - state.pending_samples.entry(*timestamp).or_insert(sample); + // Avoid going through the Map if history_depth == 1 + if states.history_depth == 1 { + state.last_delivered = Some(*timestamp); + states.callback.call(sample); + } else { + state.pending_samples.entry(*timestamp).or_insert(sample); + if state.pending_samples.len() >= states.history_depth { + flush_timestamped_source(state, &states.callback); + } + } } } false @@ -652,6 +679,11 @@ impl AdvancedSubscriber { }), key_expr: key_expr.clone().into_owned(), retransmission: retransmission.is_some(), + history_depth: conf + .history + .as_ref() + .and_then(|h| h.sample_depth) + .unwrap_or_default(), query_target: conf.query_target, query_timeout: conf.query_timeout, callback: callback.clone(), From 4d5b729e2b1565e7d130239b740f92d5eaa3aac0 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 6 Feb 2025 17:50:00 +0100 Subject: [PATCH 2/3] Simplify code --- zenoh-ext/src/advanced_subscriber.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 3ea1c5f103..a3ea57c76f 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -561,19 +561,15 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { pending_samples: BTreeMap::new(), }); if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) { - if states.global_pending_queries == 0 && state.pending_queries == 0 { + if (states.global_pending_queries == 0 && state.pending_queries == 0) + || states.history_depth == 1 + { state.last_delivered = Some(*timestamp); states.callback.call(sample); } else { - // Avoid going through the Map if history_depth == 1 - if states.history_depth == 1 { - state.last_delivered = Some(*timestamp); - states.callback.call(sample); - } else { - state.pending_samples.entry(*timestamp).or_insert(sample); - if state.pending_samples.len() >= states.history_depth { - flush_timestamped_source(state, &states.callback); - } + state.pending_samples.entry(*timestamp).or_insert(sample); + if state.pending_samples.len() >= states.history_depth { + flush_timestamped_source(state, &states.callback); } } } From 2c6379a4b738d9747772068aa39c42f5d1e355ad Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 7 Feb 2025 10:13:34 +0100 Subject: [PATCH 3/3] Simplify code --- zenoh-ext/src/advanced_subscriber.rs | 37 +++++++++++++++------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index a3ea57c76f..27f52e3a91 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -22,7 +22,7 @@ use zenoh::{ query::{ ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters, }, - sample::{Locality, Sample, SampleKind}, + sample::{Locality, Sample, SampleKind, SourceSn}, session::{EntityGlobalId, EntityId}, Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR, KE_SUB, @@ -496,6 +496,22 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { sample.source_info().source_id(), sample.source_info().source_sn(), ) { + #[inline] + fn deliver_and_flush( + sample: Sample, + mut source_sn: SourceSn, + callback: &Callback, + state: &mut SourceState, + ) { + callback.call(sample); + state.last_delivered = Some(source_sn); + while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) { + callback.call(sample); + source_sn += 1; + state.last_delivered = Some(source_sn); + } + } + let entry = states.sequenced_states.entry(*source_id); let new = matches!(&entry, Entry::Vacant(_)); let state = entry.or_insert(SourceState:: { @@ -511,14 +527,8 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { } else { state.pending_samples.insert(source_sn, sample); if state.pending_samples.len() >= states.history_depth { - if let Some((mut last_seq_num, sample)) = state.pending_samples.pop_first() { - states.callback.call(sample); - state.last_delivered = Some(last_seq_num); - while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { - states.callback.call(s); - last_seq_num += 1; - state.last_delivered = Some(last_seq_num); - } + if let Some((sn, sample)) = state.pending_samples.pop_first() { + deliver_and_flush(sample, sn, &states.callback, state); } } } @@ -543,14 +553,7 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { } } } else { - states.callback.call(sample); - let mut last_seq_num = source_sn; - state.last_delivered = Some(last_seq_num); - while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { - states.callback.call(s); - last_seq_num += 1; - state.last_delivered = Some(last_seq_num); - } + deliver_and_flush(sample, source_sn, &states.callback, state); } new } else if let Some(timestamp) = sample.timestamp() {