diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 8b8a67e2c..27f52e3a9 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -22,7 +22,7 @@ use zenoh::{ query::{ ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters, }, - sample::{Locality, Sample, SampleKind}, + sample::{Locality, Sample, SampleKind, SourceSn}, session::{EntityGlobalId, EntityId}, Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR, KE_SUB, @@ -421,6 +421,7 @@ struct State { key_expr: KeyExpr<'static>, retransmission: bool, period: Option, + history_depth: usize, query_target: QueryTarget, query_timeout: Duration, callback: Callback, @@ -495,6 +496,22 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { sample.source_info().source_id(), sample.source_info().source_sn(), ) { + #[inline] + fn deliver_and_flush( + sample: Sample, + mut source_sn: SourceSn, + callback: &Callback, + state: &mut SourceState, + ) { + callback.call(sample); + state.last_delivered = Some(source_sn); + while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) { + callback.call(sample); + source_sn += 1; + state.last_delivered = Some(source_sn); + } + } + let entry = states.sequenced_states.entry(*source_id); let new = matches!(&entry, Entry::Vacant(_)); let state = entry.or_insert(SourceState:: { @@ -502,8 +519,19 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { pending_queries: 0, pending_samples: BTreeMap::new(), }); - if states.global_pending_queries != 0 { - state.pending_samples.insert(source_sn, sample); + if state.last_delivered.is_none() && states.global_pending_queries != 0 { + // Avoid going through the Map if history_depth == 1 + if states.history_depth == 1 { + state.last_delivered = Some(source_sn); + states.callback.call(sample); + } else { + state.pending_samples.insert(source_sn, sample); + if state.pending_samples.len() >= states.history_depth { + if let Some((sn, sample)) = state.pending_samples.pop_first() { + deliver_and_flush(sample, sn, &states.callback, state); + } + } + } } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 { if source_sn > state.last_delivered.unwrap() { if states.retransmission { @@ -525,14 +553,7 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { } } } else { - states.callback.call(sample); - let mut last_seq_num = source_sn; - state.last_delivered = Some(last_seq_num); - while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { - states.callback.call(s); - last_seq_num += 1; - state.last_delivered = Some(last_seq_num); - } + deliver_and_flush(sample, source_sn, &states.callback, state); } new } else if let Some(timestamp) = sample.timestamp() { @@ -543,11 +564,16 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool { pending_samples: BTreeMap::new(), }); if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) { - if states.global_pending_queries == 0 && state.pending_queries == 0 { + if (states.global_pending_queries == 0 && state.pending_queries == 0) + || states.history_depth == 1 + { state.last_delivered = Some(*timestamp); states.callback.call(sample); } else { state.pending_samples.entry(*timestamp).or_insert(sample); + if state.pending_samples.len() >= states.history_depth { + flush_timestamped_source(state, &states.callback); + } } } false @@ -652,6 +678,11 @@ impl AdvancedSubscriber { }), key_expr: key_expr.clone().into_owned(), retransmission: retransmission.is_some(), + history_depth: conf + .history + .as_ref() + .and_then(|h| h.sample_depth) + .unwrap_or_default(), query_target: conf.query_target, query_timeout: conf.query_timeout, callback: callback.clone(),