Skip to content

Commit

Permalink
Add traits and their impls we need for replication
Browse files Browse the repository at this point in the history
  • Loading branch information
cowlicks committed Oct 21, 2024
1 parent 80d4615 commit 9ba2f1c
Showing 1 changed file with 231 additions and 0 deletions.
231 changes: 231 additions & 0 deletions src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
//! External interface for replication
pub mod events;

use crate::{
AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock,
RequestSeek, RequestUpgrade,
};

pub use events::Event;

use tokio::sync::{broadcast::Receiver, Mutex};

use std::future::Future;
use std::sync::Arc;
/// Hypercore that can have multiple owners
#[derive(Debug, Clone)]
pub struct SharedCore(pub Arc<Mutex<Hypercore>>);

impl From<Hypercore> for SharedCore {
fn from(core: Hypercore) -> Self {
SharedCore(Arc::new(Mutex::new(core)))
}
}
impl SharedCore {
/// Create a shared core from a [`Hypercore`]
pub fn from_hypercore(core: Hypercore) -> Self {
SharedCore(Arc::new(Mutex::new(core)))
}
}

/// Methods related to just this core's information
pub trait CoreInfo {
/// Get core info (see: [`Hypercore::info`]
fn info(&self) -> impl Future<Output = Info> + Send;
/// Get the key_pair (see: [`Hypercore::key_pair`]
fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send;
}

impl CoreInfo for SharedCore {
fn info(&self) -> impl Future<Output = Info> + Send {
async move {
let core = &self.0.lock().await;
core.info()
}
}

fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send {
async move {
let core = &self.0.lock().await;
core.key_pair().clone()
}
}
}

impl CoreInfo for Hypercore {
fn info(&self) -> impl Future<Output = Info> + Send {
async move { self.info() }
}

fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send {
async move { self.key_pair().clone() }
}
}

/// Error for ReplicationMethods trait
#[derive(thiserror::Error, Debug)]
pub enum ReplicationMethodsError {
/// Error from hypercore
#[error("Got a hypercore error: [{0}]")]
HypercoreError(#[from] HypercoreError),
}

/// Methods needed for replication
pub trait ReplicationMethods: CoreInfo + Send {
/// ref Core::verify_and_apply_proof
fn verify_and_apply_proof(
&self,
proof: &Proof,
) -> impl Future<Output = Result<bool, ReplicationMethodsError>> + Send;
/// ref Core::missing_nodes
fn missing_nodes(
&self,
index: u64,
) -> impl Future<Output = Result<u64, ReplicationMethodsError>> + Send;
/// ref Core::create_proof
fn create_proof(
&self,
block: Option<RequestBlock>,
hash: Option<RequestBlock>,
seek: Option<RequestSeek>,
upgrade: Option<RequestUpgrade>,
) -> impl Future<Output = Result<Option<Proof>, ReplicationMethodsError>> + Send;
/// subscribe to core events
fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>>;
}

impl ReplicationMethods for SharedCore {
fn verify_and_apply_proof(
&self,
proof: &Proof,
) -> impl Future<Output = Result<bool, ReplicationMethodsError>> {
async move {
let mut core = self.0.lock().await;
Ok(core.verify_and_apply_proof(proof).await?)
}
}

fn missing_nodes(
&self,
index: u64,
) -> impl Future<Output = Result<u64, ReplicationMethodsError>> {
async move {
let mut core = self.0.lock().await;
Ok(core.missing_nodes(index).await?)
}
}

fn create_proof(
&self,
block: Option<RequestBlock>,
hash: Option<RequestBlock>,
seek: Option<RequestSeek>,
upgrade: Option<RequestUpgrade>,
) -> impl Future<Output = Result<Option<Proof>, ReplicationMethodsError>> {
async move {
let mut core = self.0.lock().await;
Ok(core.create_proof(block, hash, seek, upgrade).await?)
}
}

fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>> {
async move { self.0.lock().await.event_subscribe() }
}
}

/// Error for ReplicationMethods trait
#[derive(thiserror::Error, Debug)]
pub enum CoreMethodsError {
/// Error from hypercore
#[error("Got a hypercore error [{0}]")]
HypercoreError(#[from] HypercoreError),
}

/// Trait for things that consume [`crate::Hypercore`] can instead use this trait
/// so they can use all Hypercore-like things such as [`SharedCore`].
pub trait CoreMethods: CoreInfo {
/// Check if the core has the block at the given index locally
fn has(&self, index: u64) -> impl Future<Output = bool> + Send;

/// get a block
fn get(
&self,
index: u64,
) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send;

/// Append data to the core
fn append(
&self,
data: &[u8],
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send;

/// Append a batch of data to the core
fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>(
&self,
batch: B,
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send;
}

impl CoreMethods for SharedCore {
fn has(&self, index: u64) -> impl Future<Output = bool> + Send {
async move {
let core = self.0.lock().await;
core.has(index)
}
}
fn get(
&self,
index: u64,
) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send {
async move {
let mut core = self.0.lock().await;
Ok(core.get(index).await?)
}
}

fn append(
&self,
data: &[u8],
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
async move {
let mut core = self.0.lock().await;
Ok(core.append(data).await?)
}
}

fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>(
&self,
batch: B,
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
async move {
let mut core = self.0.lock().await;
Ok(core.append_batch(batch).await?)
}
}
}

impl CoreMethods for Hypercore {
fn has(&self, index: u64) -> impl Future<Output = bool> + Send {
async move { self.has(index) }
}
fn get(
&self,
index: u64,
) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send {
async move { Ok(self.get(index).await?) }
}

fn append(
&self,
data: &[u8],
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
async move { Ok(self.append(data).await?) }
}

fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>(
&self,
batch: B,
) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send {
async move { Ok(self.append_batch(batch).await?) }
}
}

0 comments on commit 9ba2f1c

Please sign in to comment.