Skip to content

Commit

Permalink
AdvancedSub starts delivering samples as soon as sample_depth samples…
Browse files Browse the repository at this point in the history
… are received (#1753)

* AdvancedSub starts delivering samples as soon as sample_depth samples received

* Simplify code

* Simplify code
  • Loading branch information
OlivierHecart authored Feb 7, 2025
1 parent 2295222 commit 69f810d
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zenoh::{
query::{
ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
},
sample::{Locality, Sample, SampleKind},
sample::{Locality, Sample, SampleKind, SourceSn},
session::{EntityGlobalId, EntityId},
Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_EMPTY, KE_PUB, KE_STAR,
KE_STARSTAR, KE_SUB,
Expand Down Expand Up @@ -421,6 +421,7 @@ struct State {
key_expr: KeyExpr<'static>,
retransmission: bool,
period: Option<Period>,
history_depth: usize,
query_target: QueryTarget,
query_timeout: Duration,
callback: Callback<Sample>,
Expand Down Expand Up @@ -495,15 +496,42 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool {
sample.source_info().source_id(),
sample.source_info().source_sn(),
) {
#[inline]
fn deliver_and_flush(
sample: Sample,
mut source_sn: SourceSn,
callback: &Callback<Sample>,
state: &mut SourceState<u32>,
) {
callback.call(sample);
state.last_delivered = Some(source_sn);
while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
callback.call(sample);
source_sn += 1;
state.last_delivered = Some(source_sn);
}
}

let entry = states.sequenced_states.entry(*source_id);
let new = matches!(&entry, Entry::Vacant(_));
let state = entry.or_insert(SourceState::<u32> {
last_delivered: None,
pending_queries: 0,
pending_samples: BTreeMap::new(),
});
if states.global_pending_queries != 0 {
state.pending_samples.insert(source_sn, sample);
if state.last_delivered.is_none() && states.global_pending_queries != 0 {
// Avoid going through the Map if history_depth == 1
if states.history_depth == 1 {
state.last_delivered = Some(source_sn);
states.callback.call(sample);
} else {
state.pending_samples.insert(source_sn, sample);
if state.pending_samples.len() >= states.history_depth {
if let Some((sn, sample)) = state.pending_samples.pop_first() {
deliver_and_flush(sample, sn, &states.callback, state);
}
}
}
} else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 {
if source_sn > state.last_delivered.unwrap() {
if states.retransmission {
Expand All @@ -525,14 +553,7 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool {
}
}
} else {
states.callback.call(sample);
let mut last_seq_num = source_sn;
state.last_delivered = Some(last_seq_num);
while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) {
states.callback.call(s);
last_seq_num += 1;
state.last_delivered = Some(last_seq_num);
}
deliver_and_flush(sample, source_sn, &states.callback, state);
}
new
} else if let Some(timestamp) = sample.timestamp() {
Expand All @@ -543,11 +564,16 @@ fn handle_sample(states: &mut State, sample: Sample) -> bool {
pending_samples: BTreeMap::new(),
});
if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
if states.global_pending_queries == 0 && state.pending_queries == 0 {
if (states.global_pending_queries == 0 && state.pending_queries == 0)
|| states.history_depth == 1
{
state.last_delivered = Some(*timestamp);
states.callback.call(sample);
} else {
state.pending_samples.entry(*timestamp).or_insert(sample);
if state.pending_samples.len() >= states.history_depth {
flush_timestamped_source(state, &states.callback);
}
}
}
false
Expand Down Expand Up @@ -652,6 +678,11 @@ impl<Handler> AdvancedSubscriber<Handler> {
}),
key_expr: key_expr.clone().into_owned(),
retransmission: retransmission.is_some(),
history_depth: conf
.history
.as_ref()
.and_then(|h| h.sample_depth)
.unwrap_or_default(),
query_target: conf.query_target,
query_timeout: conf.query_timeout,
callback: callback.clone(),
Expand Down

0 comments on commit 69f810d

Please sign in to comment.