Skip to content

Commit

Permalink
Add Hypberbee::add_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
cowlicks committed Sep 26, 2024
1 parent 0724cf9 commit 2a68897
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, fmt::Debug, sync::Arc};

use derive_builder::Builder;
use futures_lite::{AsyncRead, AsyncWrite};
use hypercore::{AppendOutcome, CoreMethods, SharedCore};
use prost::{bytes::Buf, DecodeError, Message};
use replicator::{Replicate, Replicator};
Expand Down Expand Up @@ -37,6 +38,14 @@ impl BlocksBuilder {
}

impl Blocks {
pub async fn add_stream<S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(
&self,
stream: S,
is_initiator: bool,
) -> Result<(), HyperbeeError> {
Ok(self.replicator.add_stream(stream, is_initiator).await?)
}

/// Get a BlockEntry for the given `seq`
/// # Errors
/// when the provided `seq` is not in the Hypercore
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use replicator::ReplicatorError;
use std::{num::TryFromIntError, string::FromUtf8Error};
use thiserror::Error;

Expand Down Expand Up @@ -48,4 +49,6 @@ pub enum HyperbeeError {
NoRootError,
#[error("The tree already has a header")]
HeaderAlreadyExists,
#[error("Replication error: {0}")]
ReplicationError(#[from] ReplicatorError),
}
16 changes: 15 additions & 1 deletion src/hb.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt::Debug, path::Path, sync::Arc};

use derive_builder::Builder;
use futures_lite::Stream;
use futures_lite::{AsyncRead, AsyncWrite, Stream};
use hypercore::{AppendOutcome, Hypercore};
use tokio::sync::RwLock;

Expand All @@ -27,6 +27,20 @@ pub struct Hyperbee {
}

impl Hyperbee {
/// add replication stream
pub async fn add_stream<S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(
&self,
stream: S,
is_initiator: bool,
) -> Result<(), HyperbeeError> {
Ok(self
.tree
.read()
.await
.add_stream(stream, is_initiator)
.await?)
}

/// The number of blocks in the hypercore.
/// The first block is always the header block so:
/// `version` would be the `seq` of the next block
Expand Down
17 changes: 15 additions & 2 deletions src/tree.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use derive_builder::Builder;
use futures_lite::{Stream, StreamExt};
use futures_lite::{AsyncRead, AsyncWrite, Stream, StreamExt};
use hypercore::{AppendOutcome, Hypercore, HypercoreBuilder, SharedCore, Storage};
use prost::Message;
use replicator::Replicate;

use crate::{
blocks::{Blocks, BlocksBuilder},
Expand All @@ -29,6 +28,20 @@ pub struct Tree {
}

impl Tree {
/// add replication stream
pub async fn add_stream<S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(
&self,
stream: S,
is_initiator: bool,
) -> Result<(), HyperbeeError> {
Ok(self
.blocks
.read()
.await
.add_stream(stream, is_initiator)
.await?)
}

/// The number of blocks in the hypercore.
/// The first block is always the header block so:
/// `version` would be the `seq` of the next block
Expand Down

0 comments on commit 2a68897

Please sign in to comment.