diff --git a/src/replication/mod.rs b/src/replication/mod.rs new file mode 100644 index 0000000..8d2d751 --- /dev/null +++ b/src/replication/mod.rs @@ -0,0 +1,231 @@ +//! External interface for replication +pub mod events; + +use crate::{ + AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, + RequestSeek, RequestUpgrade, +}; + +pub use events::Event; + +use tokio::sync::{broadcast::Receiver, Mutex}; + +use std::future::Future; +use std::sync::Arc; +/// Hypercore that can have multiple owners +#[derive(Debug, Clone)] +pub struct SharedCore(pub Arc>); + +impl From for SharedCore { + fn from(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} +impl SharedCore { + /// Create a shared core from a [`Hypercore`] + pub fn from_hypercore(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} + +/// Methods related to just this core's information +pub trait CoreInfo { + /// Get core info (see: [`Hypercore::info`] + fn info(&self) -> impl Future + Send; + /// Get the key_pair (see: [`Hypercore::key_pair`] + fn key_pair(&self) -> impl Future + Send; +} + +impl CoreInfo for SharedCore { + fn info(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.info() + } + } + + fn key_pair(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.key_pair().clone() + } + } +} + +impl CoreInfo for Hypercore { + fn info(&self) -> impl Future + Send { + async move { self.info() } + } + + fn key_pair(&self) -> impl Future + Send { + async move { self.key_pair().clone() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum ReplicationMethodsError { + /// Error from hypercore + #[error("Got a hypercore error: [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Methods needed for replication +pub trait ReplicationMethods: CoreInfo + Send { + /// ref Core::verify_and_apply_proof + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future> + Send; + /// ref Core::missing_nodes + fn missing_nodes( + &self, + index: u64, + ) -> impl Future> + Send; + /// ref Core::create_proof + fn create_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> impl Future, ReplicationMethodsError>> + Send; + /// subscribe to core events + fn event_subscribe(&self) -> impl Future>; +} + +impl ReplicationMethods for SharedCore { + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.verify_and_apply_proof(proof).await?) + } + } + + fn missing_nodes( + &self, + index: u64, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.missing_nodes(index).await?) + } + } + + fn create_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> impl Future, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.create_proof(block, hash, seek, upgrade).await?) + } + } + + fn event_subscribe(&self) -> impl Future> { + async move { self.0.lock().await.event_subscribe() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum CoreMethodsError { + /// Error from hypercore + #[error("Got a hypercore error [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Trait for things that consume [`crate::Hypercore`] can instead use this trait +/// so they can use all Hypercore-like things such as [`SharedCore`]. +pub trait CoreMethods: CoreInfo { + /// Check if the core has the block at the given index locally + fn has(&self, index: u64) -> impl Future + Send; + + /// get a block + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send; + + /// Append data to the core + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send; + + /// Append a batch of data to the core + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send; +} + +impl CoreMethods for SharedCore { + fn has(&self, index: u64) -> impl Future + Send { + async move { + let core = self.0.lock().await; + core.has(index) + } + } + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.get(index).await?) + } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append(data).await?) + } + } + + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append_batch(batch).await?) + } + } +} + +impl CoreMethods for Hypercore { + fn has(&self, index: u64) -> impl Future + Send { + async move { self.has(index) } + } + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send { + async move { Ok(self.get(index).await?) } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send { + async move { Ok(self.append(data).await?) } + } + + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send { + async move { Ok(self.append_batch(batch).await?) } + } +}