Skip to content

Commit

Permalink
docs(storage-manager): improve documentation of Replication (#1750)
Browse files Browse the repository at this point in the history
Some comments were outdated and some methods, functions had none. This
commit fixes these oversights.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet authored Feb 3, 2025
1 parent ba64d7a commit 732196f
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl Interval {

/// Returns an [HashMap] of the index and [Fingerprint] of all the [SubInterval]s contained in
/// this [Interval].
//
// This is a convenience method used to compute the Digest and an AlignmentReply.
pub(crate) fn sub_intervals_fingerprints(&self) -> HashMap<SubIntervalIdx, Fingerprint> {
self.sub_intervals
.iter()
Expand All @@ -174,7 +176,7 @@ impl Interval {
/// As its name indicates, this method DOES NOT CHECK if there is another [Event] associated to
/// the same key expression (regardless of its [Timestamp]).
///
/// This uniqueness property (i.e. there should only be a single [Event] in the replication Log
/// This uniqueness property (i.e. there should only be a single [Event] in the Replication Log
/// for a given key expression) cannot be enforced at the [Interval] level. Hence, this method
/// assumes the check has already been performed and thus does not do redundant work.
pub(crate) fn insert_unchecked(&mut self, sub_interval_idx: SubIntervalIdx, event: Event) {
Expand Down Expand Up @@ -220,6 +222,9 @@ impl Interval {
result
}

/// Removes and returns, if found, the [Event] having the provided [EventMetadata].
///
/// The fingerprint of the Interval will be updated accordingly.
pub(crate) fn remove_event(
&mut self,
sub_interval_idx: &SubIntervalIdx,
Expand Down Expand Up @@ -422,6 +427,9 @@ impl SubInterval {
}
}

/// Removes and returns, if found, the [Event] having the same [EventMetadata].
///
/// The Fingerprint of the SubInterval is updated accordingly.
fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
let removed_event = self.events.remove(&event_to_remove.log_key());
if let Some(event) = &removed_event {
Expand All @@ -438,6 +446,8 @@ impl SubInterval {
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
///
/// The Fingerprint of the [SubInterval] is updated accordingly.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl Configuration {
}
}

/// Returns the `prefix`, if one is set, that is stripped before keys are stored in the Storage.
///
/// This corresponds to the `strip_prefix` configuration parameter of the Storage.
///
/// TODO Rename this field and method to `strip_prefix` for consistency.
pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}
Expand Down Expand Up @@ -120,8 +125,8 @@ impl Configuration {
Ok(IntervalIdx(last_elapsed_interval as u64))
}

/// Returns the index of the lowest interval contained in the *hot* era, assuming that the
/// highest interval contained in the *hot* era is the one provided.
/// Returns the index of the lowest interval contained in the *Hot* Era, assuming that the
/// highest interval contained in the *Hot* Era is the one provided.
///
/// # Example
///
Expand All @@ -139,8 +144,8 @@ impl Configuration {
(*hot_era_upper_bound - self.hot + 1).into()
}

/// Returns the index of the lowest interval contained in the *warm* era, assuming that the
/// highest interval contained in the *hot* era is the one provided.
/// Returns the index of the lowest interval contained in the *Warm* Era, assuming that the
/// highest interval contained in the *Hot* Era is the one provided.
///
/// ⚠️ Note that, even though this method computes the lower bound of the WARM era, the index
/// provided is the upper bound of the HOT era.
Expand All @@ -162,7 +167,7 @@ impl Configuration {
(*hot_era_upper_bound - self.hot - self.warm + 1).into()
}

/// Returns the time classification — [Interval] and [SubInterval] — of the provided
/// Returns the time classification — i.e. [Interval] and [SubInterval] — of the provided
/// [Timestamp].
///
/// # Errors
Expand Down
39 changes: 24 additions & 15 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ impl Replication {
///
/// # Replica discovery
///
/// To discover a Replica, this method will create a Digest subscriber, wait to receive a
/// *valid* Digest and, upon reception, ask that Replica for all its entries.
///
/// To avoid waiting indefinitely (in case there are no other Replica on the network), the
/// subscriber will wait for, at most, the duration of two Intervals.
/// To discover a Replica, this method will craft a specific [AlignmentQuery] using the
/// [Discovery] variant.
pub(crate) async fn initial_alignment(&self) {
let ke_all_replicas = match keformat!(
aligner_key_expr_formatter::formatter(),
Expand Down Expand Up @@ -160,7 +157,7 @@ impl Replication {
}
};

// We have no control over when a replica is going to be started. The purpose is here
// We have no control over when a replica is going to be started. The purpose here
// is to try to align its publications and make it so that they happen more or less
// at every interval (+ δ).
let duration_until_next_interval = {
Expand Down Expand Up @@ -278,9 +275,9 @@ impl Replication {

/// Spawns a task that subscribes to the [Digest] published by other Replicas.
///
/// Upon reception of a [Digest], it is compared with the local Replication Log. If this
/// comparison generates a [DigestDiff], the Aligner of the Replica that generated the [Digest]
/// that was processed is queried to start an alignment.
/// Upon reception of a [Digest], the local Digest is retrieved and both are compared. If this
/// comparison generates a [DigestDiff], the Aligner of the remote Replica is queried to start
/// an alignment.
///
/// [DigestDiff]: super::digest::DigestDiff
pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> {
Expand Down Expand Up @@ -411,10 +408,10 @@ impl Replication {

/// Spawns a task that handles alignment queries.
///
/// An alignment query will always come from a Replica. Hence, as multiple Replicas could query
/// at the same time, for each received query a new task is spawned. This newly spawned task is
/// responsible for fetching in the Replication Log or in the Storage the relevant information
/// to send to the Replica such that it can align its own Storage.
/// An alignment query will always come from a remote Replica. As multiple remote Replicas could
/// query at the same time, a new task is spawned for each received query. This newly spawned
/// task is responsible for fetching in the Replication Log or in the Storage the relevant
/// information to send to the remote Replica such that it can align its own Storage.
pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> {
let replication = self.clone();

Expand Down Expand Up @@ -471,7 +468,7 @@ impl Replication {
})
}

/// Spawns a new task to query the Aligner of the Replica which potentially has data this
/// Spawns a new task to query the Aligner of the remote Replica which potentially has data this
/// Storage is missing.
///
/// This method will:
Expand All @@ -481,7 +478,7 @@ impl Replication {
/// 3. Process all replies.
///
/// Note that the processing of a reply can trigger a new query (requesting additional
/// information), spawning a new task.
/// information), consequently spawning a new task.
///
/// This process is stateless and all the required information are carried in the query / reply.
pub(crate) fn spawn_query_replica_aligner(
Expand Down Expand Up @@ -580,6 +577,18 @@ impl Replication {
}
}

/// This function will search through the `events` structure and remove all event(s) that are
/// "impacted" by the wildcard.
///
/// An event should be removed if:
/// 1. The key expression of the wildcard, `wildcard_ke`, contains its key expression.
/// 2. The timestamp of the event is older than the timestamp of the wildcard.
/// 3. Their respective actions are "compatible": in particular, a Wildcard Put cannot "resuscitate"
/// a deleted key expression. See the comments within this function for other special cases that
/// need to be taken into consideration.
///
/// NOTE: This function is used to process both the `latest_updates` structure and the Replication
/// Log. Given that their structures are identical, the code was factored out and put here.
pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<LogLatestKey, Event>,
prefix: Option<&OwnedKeyExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ use crate::replication::{
///
/// A divergence in the Hot era, will directly let the Replica assess which [SubInterval]s it needs,
/// hence directly skipping to the `SubIntervals` variant.
///
/// The `Discovery` and `All` variants are used to perform the initial alignment. After receiving a
/// `Discovery` Query, a Replica will reply with its Zenoh ID. The Replica that replied first will
/// then receive an `All` Query to transfer all its content.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
/// Ask Replica for their Zenoh ID to perform an initial alignment.
Discovery,
/// Retrieve all the content of a Replica.
All,
/// First alignment Query after detecting a potential misalignment.
Diff(DigestDiff),
/// Request the Fingerprint(s) of the Sub-Interval(s) contained in the provided Interval(s).
Intervals(HashSet<IntervalIdx>),
/// Request the EventMetadata contained in the provided Sub-Interval(s).
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
/// Request the Payload associated with the provided EventMetadata.
Events(Vec<EventMetadata>),
}

Expand Down Expand Up @@ -161,20 +171,19 @@ impl Replication {
}
}

/// Replies to the provided [Query] with a hash map containing the index of the [Interval] in
/// the Cold era and their [Fingerprint].
/// Replies to the provided [Query] with a hash map containing the index of the [Interval]s in
/// the Cold Era and their [Fingerprint]s.
///
/// The Replica will use this response to assess which [Interval]s differ.
///
/// # Temporality
///
/// There is no guarantee that the Replica indicating a difference in the Cold era is "aligned":
/// it is possible that its Cold era is either ahead or late (i.e. it has more or less
/// Interval(s) in its Replication Log in the Cold era).
/// There is no guarantee that the Replica indicating a difference in the Cold Era is aligned:
/// it is possible that its Cold Era contains a different number of Intervals.
///
/// We believe this is not important: the Replication Log does not separate the Intervals based
/// on their era so performing this comparison will still be relevant — even if an Interval is
/// in the Cold era on one end and in the Warm era in the other.
/// on their Era so performing this comparison will still be relevant — even if an Interval is
/// in the Cold Era on one end and in the Warm Era in the other.
pub(crate) async fn reply_cold_era(&self, query: &Query) {
let log = self.replication_log.read().await;
let configuration = log.configuration();
Expand All @@ -200,7 +209,7 @@ impl Replication {
reply_to_query(query, reply, None).await;
}

/// Replies to the [Query] with a structure containing, for each interval index present in the
/// Replies to the [Query] with a structure containing, for each Interval index present in the
/// `different_intervals`, all the [SubInterval]s [Fingerprint].
///
/// The Replica will use this structure to assess which [SubInterval]s differ.
Expand Down Expand Up @@ -230,24 +239,6 @@ impl Replication {
///
/// The Replica will use this structure to assess which [Event] (and its associated payload) are
/// missing in its Replication Log and connected Storage.
///
/// # TODO Performance improvement
///
/// Although the Replica we are answering has to find if, for each provided [EventMetadata],
/// there is a more recent one, it does not need to go through all its Replication Log. It only
/// needs, for each [EventMetadata], to go through the Intervals that are greater than the one
/// it is contained in.
///
/// The rationale is that the Intervals are already sorted in increasing order, so if no Event,
/// for the same key expression, can be found in any greater Interval, then by definition the
/// Replication Log does not contain a more recent Event.
///
/// That would require the following changes:
/// - Change the `sub_intervals` field of the `Interval` structure to a BTreeMap.
/// - In the `reply_events_metadata` method (just below), send out a `HashMap<IntervalIdx,
/// HashMap<SubIntervalIdx, HashSet<EventMetadata>>>` instead of a `Vec<EventMetadata>`.
/// - In the `process_alignment_reply` method, implement the searching algorithm described
/// above.
pub(crate) async fn reply_events_metadata(
&self,
query: &Query,
Expand Down
Loading

0 comments on commit 732196f

Please sign in to comment.