Skip to content

Commit

Permalink
Use replication feature within Hypercore
Browse files Browse the repository at this point in the history
  • Loading branch information
cowlicks committed Oct 21, 2024
1 parent 9ba2f1c commit 5902243
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
57 changes: 53 additions & 4 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ pub struct Hypercore {
pub(crate) bitfield: Bitfield,
skip_flush_count: u8, // autoFlush in Javascript
header: Header,
#[cfg(feature = "replication")]
events: crate::replication::events::Events,
}

/// Response from append, matches that of the Javascript result
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AppendOutcome {
/// Length of the hypercore after append
pub length: u64,
Expand Down Expand Up @@ -247,6 +249,8 @@ impl Hypercore {
bitfield,
header,
skip_flush_count: 0,
#[cfg(feature = "replication")]
events: crate::replication::events::Events::new(),
})
}

Expand Down Expand Up @@ -321,6 +325,14 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
let _ = self.events.send(crate::replication::events::DataUpgrade {});
let _ = self
.events
.send(crate::replication::events::Have::from(&bitfield_update));
}
}

// Return the new value
Expand All @@ -330,10 +342,32 @@ impl Hypercore {
})
}

#[cfg(feature = "replication")]
/// Subscribe to core events relevant to replication
pub fn event_subscribe(
&self,
) -> tokio::sync::broadcast::Receiver<crate::replication::events::Event> {
self.events.channel.subscribe()
}

/// Check if core has the block at the given `index` locally
#[instrument(ret, skip(self))]
pub fn has(&self, index: u64) -> bool {
self.bitfield.get(index)
}

/// Read value at given index, if any.
#[instrument(err, skip(self))]
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
if !self.bitfield.get(index) {
#[cfg(feature = "replication")]
// if not in this core, try to get over network
{
let mut rx = self.events.send_on_get(index);
tokio::spawn(async move {
let _err_when_no_peers = rx.recv().await;
});
}
return Ok(None);
}

Expand Down Expand Up @@ -522,12 +556,12 @@ impl Hypercore {
self.storage.flush_infos(&outcome.infos_to_flush).await?;
self.header = outcome.header;

if let Some(bitfield_update) = bitfield_update {
if let Some(bitfield_update) = &bitfield_update {
// Write to bitfield
self.bitfield.update(&bitfield_update);
self.bitfield.update(bitfield_update);

// Contiguous length is known only now
update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update);
update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update);
}

// Commit changeset to in-memory tree
Expand All @@ -537,6 +571,21 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
if proof.upgrade.is_some() {
/// Notify replicator if we receieved an upgrade
let _ = self.events.send(crate::replication::events::DataUpgrade {});
}

/// Notify replicator if we receieved a bitfield update
if let Some(ref bitfield) = bitfield_update {
let _ = self
.events
.send(crate::replication::events::Have::from(bitfield));
}
}
Ok(true)
}

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![forbid(unsafe_code, bad_style, future_incompatible)]
#![forbid(unsafe_code, future_incompatible)]
#![forbid(rust_2018_idioms, rust_2018_compatibility)]
#![forbid(missing_debug_implementations)]
#![forbid(missing_docs)]
Expand Down Expand Up @@ -74,6 +74,8 @@
pub mod encoding;
pub mod prelude;
#[cfg(feature = "replication")]
pub mod replication;

mod bitfield;
mod builder;
Expand Down

0 comments on commit 5902243

Please sign in to comment.