From 60166b092b1a836b1c880de3c9d2d6b4974ee01c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 8 Apr 2020 12:30:48 +0200 Subject: [PATCH] Dynamic dispatch for Storage, no generics in Feed This removes the generic argument from the Feed struct, making it simpler to work with. Instead, the Storage is internally put into a Box, where DynStorage is an async trait with the public Storage functions. --- Cargo.toml | 1 + examples/async.rs | 15 +--- src/feed.rs | 68 ++++++++--------- src/feed_builder.rs | 19 ++--- src/lib.rs | 6 +- src/storage/mod.rs | 165 +++++++++++++++++++++++++++++------------ src/storage/persist.rs | 4 +- 7 files changed, 161 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bd9ec01..801cbb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ tree-index = "0.6.0" bitfield-rle = "0.1.1" futures = "0.3.4" async-std = "1.5.0" +async-trait = "0.1.30" [dev-dependencies] quickcheck = "0.9.2" diff --git a/examples/async.rs b/examples/async.rs index a052bfe..5c8e20c 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -1,27 +1,18 @@ use async_std::task; use hypercore::Feed; -use random_access_storage::RandomAccess; -use std::fmt::Debug; -async fn append(feed: &mut Feed, content: &[u8]) -where - T: RandomAccess> + Debug + Send, -{ +async fn append(feed: &mut Feed, content: &[u8]) { feed.append(content).await.unwrap(); } -async fn print(feed: &mut Feed) -where - T: RandomAccess> + Debug + Send, -{ +async fn print(feed: &mut Feed) { println!("{:?}", feed.get(0).await); println!("{:?}", feed.get(1).await); } fn main() { task::block_on(task::spawn(async { - let mut feed = Feed::default(); - + let mut feed = Feed::open_in_memory().await.unwrap(); append(&mut feed, b"hello").await; append(&mut feed, b"world").await; print(&mut feed).await; diff --git a/src/feed.rs b/src/feed.rs index c14b13a..31e33dc 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -2,7 +2,9 @@ use crate::feed_builder::FeedBuilder; use crate::replicate::{Message, Peer}; -pub use crate::storage::{Node, NodeTrait, Storage, Store}; +pub use crate::storage::{ + storage_disk, storage_memory, BoxStorage, Node, NodeTrait, Storage, Store, +}; use crate::audit::Audit; use crate::bitfield::Bitfield; @@ -12,12 +14,8 @@ use anyhow::{bail, ensure, Result}; use ed25519_dalek::{PublicKey, SecretKey, Signature}; use flat_tree as flat; use pretty_hash::fmt as pretty_fmt; -use random_access_disk::RandomAccessDisk; -use random_access_memory::RandomAccessMemory; -use random_access_storage::RandomAccess; use tree_index::TreeIndex; -use std::borrow::Borrow; use std::cmp; use std::fmt::{self, Debug, Display}; use std::ops::Range; @@ -26,15 +24,12 @@ use std::sync::Arc; /// Append-only log structure. #[derive(Debug)] -pub struct Feed -where - T: RandomAccess> + Debug, -{ +pub struct Feed { /// Merkle tree instance. pub(crate) merkle: Merkle, pub(crate) public_key: PublicKey, pub(crate) secret_key: Option, - pub(crate) storage: Storage, + pub(crate) storage: BoxStorage, /// Total length of data stored. pub(crate) byte_length: u64, /// TODO: description. Length of... roots? @@ -45,12 +40,9 @@ where pub(crate) peers: Vec, } -impl Feed -where - T: RandomAccess> + Debug + Send, -{ +impl Feed { /// Create a new instance with a custom storage backend. - pub async fn with_storage(mut storage: crate::storage::Storage) -> Result { + pub async fn with_storage(mut storage: BoxStorage) -> Result { match storage.read_partial_keypair().await { Some(partial_keypair) => { let builder = FeedBuilder::new(partial_keypair.public, storage); @@ -76,10 +68,26 @@ where } /// Starts a `FeedBuilder` with the provided `Keypair` and `Storage`. - pub fn builder(public_key: PublicKey, storage: Storage) -> FeedBuilder { + pub fn builder(public_key: PublicKey, storage: BoxStorage) -> FeedBuilder { FeedBuilder::new(public_key, storage) } + /// Create a new instance that persists to disk at the location of `dir`. + // TODO: Ensure that dir is always a directory. + // NOTE: Should we `mkdirp` here? + // NOTE: Should we call these `data.bitfield` / `data.tree`? + pub async fn open_from_disk>(path: P) -> Result { + let dir = path.as_ref().to_owned(); + let storage = storage_disk(&dir).await?; + Self::with_storage(storage).await + } + + /// Create a new in-memory instance. + pub async fn open_in_memory() -> Result { + let storage = storage_memory().await.unwrap(); + Self::with_storage(storage).await + } + /// Get the amount of entries in the feed. #[inline] pub fn len(&self) -> u64 { @@ -116,7 +124,7 @@ where let hash = Hash::from_roots(self.merkle.roots()); let index = self.length; let signature = sign(&self.public_key, key, hash.as_bytes()); - self.storage.put_signature(index, signature).await?; + self.storage.put_signature(index, &signature).await?; for node in self.merkle.nodes() { self.storage.put_node(node).await?; @@ -352,8 +360,7 @@ where } if let Some(sig) = sig { - let sig = sig.borrow(); - self.storage.put_signature(index, sig).await?; + self.storage.put_signature(index, &sig).await?; } for node in nodes { @@ -556,35 +563,18 @@ where } } -impl Feed { - /// Create a new instance that persists to disk at the location of `dir`. - // TODO: Ensure that dir is always a directory. - // NOTE: Should we `mkdirp` here? - // NOTE: Should we call these `data.bitfield` / `data.tree`? - pub async fn open>(path: P) -> Result { - let dir = path.as_ref().to_owned(); - let storage = Storage::new_disk(&dir).await?; - Self::with_storage(storage).await - } -} - /// Create a new instance with an in-memory storage backend. /// /// ## Panics /// Can panic if constructing the in-memory store fails, which is highly /// unlikely. -impl Default for Feed { +impl Default for Feed { fn default() -> Self { - async_std::task::block_on(async { - let storage = Storage::new_memory().await.unwrap(); - Self::with_storage(storage).await.unwrap() - }) + async_std::task::block_on(async { Self::open_in_memory().await.unwrap() }) } } -impl> + Debug + Send> Display - for Feed -{ +impl Display for Feed { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: yay, we should find a way to convert this .unwrap() to an error // type that's accepted by `fmt::Result<(), fmt::Error>`. diff --git a/src/feed_builder.rs b/src/feed_builder.rs index 26ed91a..9325038 100644 --- a/src/feed_builder.rs +++ b/src/feed_builder.rs @@ -2,8 +2,7 @@ use ed25519_dalek::{PublicKey, SecretKey}; use crate::bitfield::Bitfield; use crate::crypto::Merkle; -use crate::storage::Storage; -use random_access_storage::RandomAccess; +use crate::storage::BoxStorage; use std::fmt::Debug; use tree_index::TreeIndex; @@ -14,22 +13,16 @@ use anyhow::Result; // TODO: make this an actual builder pattern. // https://deterministic.space/elegant-apis-in-rust.html#builder-pattern #[derive(Debug)] -pub struct FeedBuilder -where - T: RandomAccess + Debug, -{ - storage: Storage, +pub struct FeedBuilder { + storage: BoxStorage, public_key: PublicKey, secret_key: Option, } -impl FeedBuilder -where - T: RandomAccess> + Debug, -{ +impl FeedBuilder { /// Create a new instance. #[inline] - pub fn new(public_key: PublicKey, storage: Storage) -> Self { + pub fn new(public_key: PublicKey, storage: BoxStorage) -> Self { Self { storage, public_key, @@ -45,7 +38,7 @@ where /// Finalize the builder. #[inline] - pub fn build(self) -> Result> { + pub fn build(self) -> Result { Ok(Feed { merkle: Merkle::new(), byte_length: 0, diff --git a/src/lib.rs b/src/lib.rs index 0c99939..9549f8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,8 +54,6 @@ pub use ed25519_dalek::{PublicKey, SecretKey}; use std::path::Path; /// Create a new Hypercore `Feed`. -pub async fn open>( - path: P, -) -> anyhow::Result> { - Feed::open(path).await +pub async fn open>(path: P) -> anyhow::Result { + Feed::open_from_disk(path).await } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 713d6f6..8315b73 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,6 +8,7 @@ pub use self::persist::Persist; pub use merkle_tree_stream::Node as NodeTrait; use anyhow::{anyhow, ensure, Result}; +use async_trait::async_trait; use ed25519_dalek::{PublicKey, SecretKey, Signature, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH}; use flat_tree as flat; use futures::future::FutureExt; @@ -28,6 +29,99 @@ pub struct PartialKeypair { pub secret: Option, } +pub type BoxStorage = Box; + +/// Create a new instance backed by a `RandomAccessMemory` instance. +pub async fn storage_memory() -> Result> { + let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); + Ok(Storage::new(create).await?) +} + +/// Create a new instance backed by a `RandomAccessDisk` instance. +pub async fn storage_disk(dir: &PathBuf) -> Result> { + let storage = |storage: Store| { + let name = match storage { + Store::Tree => "tree", + Store::Data => "data", + Store::Bitfield => "bitfield", + Store::Signatures => "signatures", + Store::Keypair => "key", + }; + RandomAccessDisk::open(dir.as_path().join(name)).boxed() + }; + Ok(Storage::new(storage).await?) +} + +#[async_trait] +pub trait DynStorage: Debug + Send { + /// Write data to the feed. + async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()>; + + /// Write a byte vector to a data storage (random-access instance) at the + /// position of `index`. + /// + /// NOTE: Meant to be called from the `.put()` feed method. Probably used to + /// insert data as-is after receiving it from the network (need to confirm + /// with mafintosh). + /// TODO: Ensure the signature size is correct. + /// NOTE: Should we create a `Data` entry type? + async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()>; + + /// Get data from disk that the user has written to it. This is stored + /// unencrypted, so there's no decryption needed. + // FIXME: data_offset always reads out index 0, length 0 + async fn get_data(&mut self, index: u64) -> Result>; + + /// Search the signature stores for a `Signature`, starting at `index`. + fn next_signature<'a>( + &'a mut self, + index: u64, + ) -> futures::future::BoxFuture<'a, Result>; + + /// Get a `Signature` from the store. + async fn get_signature(&mut self, index: u64) -> Result; + + /// Write a `Signature` to `self.Signatures`. + /// TODO: Ensure the signature size is correct. + /// NOTE: Should we create a `Signature` entry type? + async fn put_signature(&mut self, index: u64, signature: &Signature) -> Result<()>; + + /// TODO(yw) docs + /// Get the offset for the data, return `(offset, size)`. + /// + /// ## Panics + /// A panic can occur if no maximum value is found. + async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result>; + + /// Get a `Node` from the `tree` storage. + async fn get_node(&mut self, index: u64) -> Result; + + /// Write a `Node` to the `tree` storage. + /// TODO: prevent extra allocs here. Implement a method on node that can reuse + /// a buffer. + async fn put_node(&mut self, node: &Node) -> Result<()>; + + /// Write data to the internal bitfield module. + /// TODO: Ensure the chunk size is correct. + /// NOTE: Should we create a bitfield entry type? + async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()>; + + /// Read a public key from storage + async fn read_public_key(&mut self) -> Result; + + /// Read a secret key from storage + async fn read_secret_key(&mut self) -> Result; + + /// Write a public key to the storage + async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()>; + + /// Write a secret key to the storage + async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()>; + + /// Tries to read a partial keypair (ie: with an optional secret_key) from the storage + async fn read_partial_keypair(&mut self) -> Option; +} + /// The types of stores that can be created. #[derive(Debug)] pub enum Store { @@ -58,13 +152,13 @@ where impl Storage where - T: RandomAccess> + Debug + Send, + T: RandomAccess> + Debug + Send + 'static, { /// Create a new instance. Takes a keypair and a callback to create new /// storage instances. // Named `.open()` in the JS version. Replaces the `.openKey()` method too by // requiring a key pair to be initialized before creating a new instance. - pub async fn new(create: Cb) -> Result + pub async fn new(create: Cb) -> Result> where Cb: Fn(Store) -> std::pin::Pin> + Send>>, { @@ -97,12 +191,18 @@ where .await .map_err(|e| anyhow!(e))?; - Ok(instance) + Ok(Box::new(instance)) } +} +#[async_trait] +impl DynStorage for Storage +where + T: RandomAccess> + Debug + Send, +{ /// Write data to the feed. #[inline] - pub async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { + async fn write_data(&mut self, offset: u64, data: &[u8]) -> Result<()> { self.data.write(offset, &data).await.map_err(|e| anyhow!(e)) } @@ -114,7 +214,7 @@ where /// with mafintosh). /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Data` entry type? - pub async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { + async fn put_data(&mut self, index: u64, data: &[u8], nodes: &[Node]) -> Result<()> { if data.is_empty() { return Ok(()); } @@ -136,7 +236,7 @@ where /// unencrypted, so there's no decryption needed. // FIXME: data_offset always reads out index 0, length 0 #[inline] - pub async fn get_data(&mut self, index: u64) -> Result> { + async fn get_data(&mut self, index: u64) -> Result> { let cached_nodes = Vec::new(); // TODO: reuse allocation. let range = self.data_offset(index, &cached_nodes).await?; self.data @@ -146,7 +246,7 @@ where } /// Search the signature stores for a `Signature`, starting at `index`. - pub fn next_signature<'a>( + fn next_signature<'a>( &'a mut self, index: u64, ) -> futures::future::BoxFuture<'a, Result> { @@ -169,7 +269,7 @@ where /// Get a `Signature` from the store. #[inline] - pub async fn get_signature(&mut self, index: u64) -> Result { + async fn get_signature(&mut self, index: u64) -> Result { let bytes = self .signatures .read(HEADER_OFFSET + 64 * index, 64) @@ -183,11 +283,7 @@ where /// TODO: Ensure the signature size is correct. /// NOTE: Should we create a `Signature` entry type? #[inline] - pub async fn put_signature( - &mut self, - index: u64, - signature: impl Borrow, - ) -> Result<()> { + async fn put_signature(&mut self, index: u64, signature: &Signature) -> Result<()> { let signature = signature.borrow(); self.signatures .write(HEADER_OFFSET + 64 * index, &signature.to_bytes()) @@ -200,7 +296,7 @@ where /// /// ## Panics /// A panic can occur if no maximum value is found. - pub async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { + async fn data_offset(&mut self, index: u64, cached_nodes: &[Node]) -> Result> { let mut roots = Vec::new(); // TODO: reuse alloc flat::full_roots(tree_index(index), &mut roots); @@ -247,7 +343,7 @@ where /// Get a `Node` from the `tree` storage. #[inline] - pub async fn get_node(&mut self, index: u64) -> Result { + async fn get_node(&mut self, index: u64) -> Result { let buf = self .tree .read(HEADER_OFFSET + 40 * index, 40) @@ -261,7 +357,7 @@ where /// TODO: prevent extra allocs here. Implement a method on node that can reuse /// a buffer. #[inline] - pub async fn put_node(&mut self, node: &Node) -> Result<()> { + async fn put_node(&mut self, node: &Node) -> Result<()> { let index = node.index(); let buf = node.to_bytes()?; self.tree @@ -274,7 +370,7 @@ where /// TODO: Ensure the chunk size is correct. /// NOTE: Should we create a bitfield entry type? #[inline] - pub async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { + async fn put_bitfield(&mut self, offset: u64, data: &[u8]) -> Result<()> { self.bitfield .write(HEADER_OFFSET + offset, data) .await @@ -282,7 +378,7 @@ where } /// Read a public key from storage - pub async fn read_public_key(&mut self) -> Result { + async fn read_public_key(&mut self) -> Result { let buf = self .keypair .read(0, PUBLIC_KEY_LENGTH as u64) @@ -293,7 +389,7 @@ where } /// Read a secret key from storage - pub async fn read_secret_key(&mut self) -> Result { + async fn read_secret_key(&mut self) -> Result { let buf = self .keypair .read(PUBLIC_KEY_LENGTH as u64, SECRET_KEY_LENGTH as u64) @@ -304,13 +400,13 @@ where } /// Write a public key to the storage - pub async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { + async fn write_public_key(&mut self, public_key: &PublicKey) -> Result<()> { let buf: [u8; PUBLIC_KEY_LENGTH] = public_key.to_bytes(); self.keypair.write(0, &buf).await.map_err(|e| anyhow!(e)) } /// Write a secret key to the storage - pub async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { + async fn write_secret_key(&mut self, secret_key: &SecretKey) -> Result<()> { let buf: [u8; SECRET_KEY_LENGTH] = secret_key.to_bytes(); self.keypair .write(PUBLIC_KEY_LENGTH as u64, &buf) @@ -319,7 +415,7 @@ where } /// Tries to read a partial keypair (ie: with an optional secret_key) from the storage - pub async fn read_partial_keypair(&mut self) -> Option { + async fn read_partial_keypair(&mut self) -> Option { match self.read_public_key().await { Ok(public) => match self.read_secret_key().await { Ok(secret) => Some(PartialKeypair { @@ -336,31 +432,6 @@ where } } -impl Storage { - /// Create a new instance backed by a `RandomAccessMemory` instance. - pub async fn new_memory() -> Result { - let create = |_| async { Ok(RandomAccessMemory::default()) }.boxed(); - Ok(Self::new(create).await?) - } -} - -impl Storage { - /// Create a new instance backed by a `RandomAccessDisk` instance. - pub async fn new_disk(dir: &PathBuf) -> Result { - let storage = |storage: Store| { - let name = match storage { - Store::Tree => "tree", - Store::Data => "data", - Store::Bitfield => "bitfield", - Store::Signatures => "signatures", - Store::Keypair => "key", - }; - RandomAccessDisk::open(dir.as_path().join(name)).boxed() - }; - Ok(Self::new(storage).await?) - } -} - /// Get a node from a vector of nodes. #[inline] fn find_node(nodes: &[Node], index: u64) -> Option<&Node> { diff --git a/src/storage/persist.rs b/src/storage/persist.rs index 70a3ec0..b514d15 100644 --- a/src/storage/persist.rs +++ b/src/storage/persist.rs @@ -1,4 +1,4 @@ -use super::Storage; +use super::BoxStorage; use anyhow::Result; use random_access_storage::RandomAccess; use std::fmt::Debug; @@ -15,5 +15,5 @@ where fn to_vec(&self) -> Result>; /// Persist into a storage backend. - fn store(&self, index: u64, store: Storage) -> Result<()>; + fn store(&self, index: u64, store: BoxStorage) -> Result<()>; }