Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Hypercore with generic parameters removed #1

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tracing = "0.1"
pretty-hash = "0.4"
futures-timer = "3"
futures-lite = "1"
hypercore = { version = "0.12", default-features = false }
hypercore = { version = "0.13.0", default-features = false }
sha2 = "0.10"
curve25519-dalek = "4"
crypto_secretstream = "0.2"
Expand All @@ -46,9 +46,6 @@ async-std = { version = "1.12.0", features = ["attributes", "unstable"] }
async-compat = "0.2.1"
tokio = { version = "1.27.0", features = ["macros", "net", "process", "rt", "rt-multi-thread", "sync", "time"] }
env_logger = "0.7.1"
random-access-storage = "5.0.0"
random-access-disk = { version = "3.0.0", default-features = false }
random-access-memory = "3.0.0"
anyhow = "1.0.28"
instant = "0.1"
criterion = { version = "0.4", features = ["async_std"] }
Expand All @@ -67,8 +64,8 @@ wasm-bindgen = [
]
sparse = ["hypercore/sparse"]
cache = ["hypercore/cache"]
tokio = ["hypercore/tokio", "random-access-disk/tokio"]
async-std = ["hypercore/async-std", "random-access-disk/async-std"]
tokio = ["hypercore/tokio"]
async-std = ["hypercore/async-std"]
# Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore
# to verify that this crate works. To run them, use:
# cargo test --features js_interop_tests
Expand Down
54 changes: 16 additions & 38 deletions examples/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use hypercore::{
VerifyingKey,
};
use log::*;
use random_access_memory::RandomAccessMemory;
use random_access_storage::RandomAccess;
use std::collections::HashMap;
use std::convert::TryInto;
use std::env;
Expand All @@ -38,7 +36,7 @@ fn main() {
});

task::block_on(async move {
let mut hypercore_store: HypercoreStore<RandomAccessMemory> = HypercoreStore::new();
let mut hypercore_store: HypercoreStore = HypercoreStore::new();
let storage = Storage::new_memory().await.unwrap();
// Create a hypercore.
let hypercore = if let Some(key) = key {
Expand Down Expand Up @@ -86,14 +84,11 @@ fn usage() {
// or once when connected (if client).
// Unfortunately, everything that touches the hypercore_store or a hypercore has to be generic
// at the moment.
async fn onconnection<T: 'static>(
async fn onconnection(
stream: TcpStream,
is_initiator: bool,
hypercore_store: Arc<HypercoreStore<T>>,
) -> Result<()>
where
T: RandomAccess + Debug + Send,
{
hypercore_store: Arc<HypercoreStore>,
) -> Result<()> {
info!("onconnection, initiator: {}", is_initiator);
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream);
info!("protocol created, polling for next()");
Expand Down Expand Up @@ -127,58 +122,44 @@ where

/// A container for hypercores.
#[derive(Debug)]
struct HypercoreStore<T>
where
T: RandomAccess + Debug + Send,
{
hypercores: HashMap<String, Arc<HypercoreWrapper<T>>>,
struct HypercoreStore {
hypercores: HashMap<String, Arc<HypercoreWrapper>>,
}
impl<T> HypercoreStore<T>
where
T: RandomAccess + Debug + Send,
{
impl HypercoreStore {
pub fn new() -> Self {
let hypercores = HashMap::new();
Self { hypercores }
}

pub fn add(&mut self, hypercore: HypercoreWrapper<T>) {
pub fn add(&mut self, hypercore: HypercoreWrapper) {
let hdkey = hex::encode(hypercore.discovery_key);
self.hypercores.insert(hdkey, Arc::new(hypercore));
}

pub fn get(&self, discovery_key: &[u8; 32]) -> Option<&Arc<HypercoreWrapper<T>>> {
pub fn get(&self, discovery_key: &[u8; 32]) -> Option<&Arc<HypercoreWrapper>> {
let hdkey = hex::encode(discovery_key);
self.hypercores.get(&hdkey)
}
}

/// A Hypercore is a single unit of replication, an append-only log.
#[derive(Debug, Clone)]
struct HypercoreWrapper<T>
where
T: RandomAccess + Debug + Send,
{
struct HypercoreWrapper {
discovery_key: [u8; 32],
key: [u8; 32],
hypercore: Arc<Mutex<Hypercore<T>>>,
hypercore: Arc<Mutex<Hypercore>>,
}

impl HypercoreWrapper<RandomAccessMemory> {
pub fn from_memory_hypercore(hypercore: Hypercore<RandomAccessMemory>) -> Self {
impl HypercoreWrapper {
pub fn from_memory_hypercore(hypercore: Hypercore) -> Self {
let key = hypercore.key_pair().public.to_bytes();
HypercoreWrapper {
key,
discovery_key: discovery_key(&key),
hypercore: Arc::new(Mutex::new(hypercore)),
}
}
}

impl<T> HypercoreWrapper<T>
where
T: RandomAccess + Debug + Send + 'static,
{
pub fn key(&self) -> &[u8; 32] {
&self.key
}
Expand Down Expand Up @@ -263,15 +244,12 @@ impl Default for PeerState {
}
}

async fn onmessage<T>(
hypercore: &mut Arc<Mutex<Hypercore<T>>>,
async fn onmessage(
hypercore: &mut Arc<Mutex<Hypercore>>,
peer_state: &mut PeerState,
channel: &mut Channel,
message: Message,
) -> Result<()>
where
T: RandomAccess + Debug + Send,
{
) -> Result<()> {
match message {
Message::Synchronize(message) => {
println!("Got Synchronize message {message:?}");
Expand Down
54 changes: 16 additions & 38 deletions tests/js_interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use hypercore::{
VerifyingKey, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH,
};
use instant::Duration;
use random_access_disk::RandomAccessDisk;
use random_access_storage::RandomAccess;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -388,7 +386,7 @@ async fn create_writer_hypercore(
data_size: usize,
data_char: char,
path: &str,
) -> Result<Hypercore<RandomAccessDisk>> {
) -> Result<Hypercore> {
let path = Path::new(path).to_owned();
let key_pair = get_test_key_pair(true);
let storage = Storage::new_disk(&path, false).await?;
Expand All @@ -403,7 +401,7 @@ async fn create_writer_hypercore(
Ok(hypercore)
}

async fn create_reader_hypercore(path: &str) -> Result<Hypercore<RandomAccessDisk>> {
async fn create_reader_hypercore(path: &str) -> Result<Hypercore> {
let path = Path::new(path).to_owned();
let key_pair = get_test_key_pair(false);
let storage = Storage::new_disk(&path, false).await?;
Expand Down Expand Up @@ -442,14 +440,11 @@ pub fn get_test_key_pair(include_secret: bool) -> PartialKeypair {
}

#[cfg(feature = "async-std")]
async fn on_replication_connection<T: 'static>(
async fn on_replication_connection(
stream: TcpStream,
is_initiator: bool,
hypercore: Arc<HypercoreWrapper<T>>,
) -> Result<()>
where
T: RandomAccess + Debug + Send,
{
hypercore: Arc<HypercoreWrapper>,
) -> Result<()> {
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream);
while let Some(event) = protocol.next().await {
let event = event?;
Expand Down Expand Up @@ -479,14 +474,11 @@ where
}

#[cfg(feature = "tokio")]
async fn on_replication_connection<T: 'static>(
async fn on_replication_connection(
stream: TcpStream,
is_initiator: bool,
hypercore: Arc<HypercoreWrapper<T>>,
) -> Result<()>
where
T: RandomAccess + Debug + Send,
{
hypercore: Arc<HypercoreWrapper>,
) -> Result<()> {
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream.compat());
while let Some(event) = protocol.next().await {
let event = event?;
Expand Down Expand Up @@ -516,21 +508,15 @@ where
}

#[derive(Debug, Clone)]
pub struct HypercoreWrapper<T>
where
T: RandomAccess + Debug + Send,
{
pub struct HypercoreWrapper {
discovery_key: [u8; 32],
key: [u8; 32],
hypercore: Arc<Mutex<Hypercore<T>>>,
hypercore: Arc<Mutex<Hypercore>>,
result_path: Option<String>,
}

impl HypercoreWrapper<RandomAccessDisk> {
pub fn from_disk_hypercore(
hypercore: Hypercore<RandomAccessDisk>,
result_path: Option<String>,
) -> Self {
impl HypercoreWrapper {
pub fn from_disk_hypercore(hypercore: Hypercore, result_path: Option<String>) -> Self {
let key = hypercore.key_pair().public.to_bytes();
HypercoreWrapper {
key,
Expand All @@ -539,12 +525,7 @@ impl HypercoreWrapper<RandomAccessDisk> {
result_path,
}
}
}

impl<T> HypercoreWrapper<T>
where
T: RandomAccess + Debug + Send + 'static,
{
pub fn key(&self) -> &[u8; 32] {
&self.key
}
Expand Down Expand Up @@ -609,16 +590,13 @@ where
}
}

async fn on_replication_message<T>(
hypercore: &mut Arc<Mutex<Hypercore<T>>>,
async fn on_replication_message(
hypercore: &mut Arc<Mutex<Hypercore>>,
peer_state: &mut PeerState,
result_path: Option<String>,
channel: &mut Channel,
message: Message,
) -> Result<bool>
where
T: RandomAccess + Debug + Send,
{
) -> Result<bool> {
match message {
Message::Synchronize(message) => {
let length_changed = message.length != peer_state.remote_length;
Expand Down Expand Up @@ -857,7 +835,7 @@ impl RustServer {
RustServer { handle: None }
}

pub async fn run(&mut self, hypercore: Arc<HypercoreWrapper<RandomAccessDisk>>, port: u32) {
pub async fn run(&mut self, hypercore: Arc<HypercoreWrapper>, port: u32) {
self.handle = Some(task::spawn(async move {
tcp_server(port, on_replication_connection, hypercore)
.await
Expand Down