diff --git a/.gitignore b/.gitignore index 6dee292..99f3cf8 100644 --- a/.gitignore +++ b/.gitignore @@ -201,4 +201,5 @@ Cargo.lock !memcrab !memcrab-cli !memcrab-cache -!memcrab-server \ No newline at end of file +!memcrab-server +!memcrab-protocol diff --git a/memcrab-protocol/Cargo.toml b/memcrab-protocol/Cargo.toml new file mode 100644 index 0000000..1e3864d --- /dev/null +++ b/memcrab-protocol/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "memcrab-protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.77" +tokio = { version = "1.35.1", features = ["full"] } diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs new file mode 100644 index 0000000..2a205e2 --- /dev/null +++ b/memcrab-protocol/src/lib.rs @@ -0,0 +1,132 @@ +use std::io; +use tokio::io::AsyncReadExt; +use tokio::net::tcp::OwnedReadHalf; + +type Bytes = Vec; + +fn four_bytes_to_usize(bytes: &[u8]) -> usize { + assert_eq!(bytes.len(), 4); + + (bytes[0] as usize) << 24 + | (bytes[1] as usize) << 16 + | (bytes[2] as usize) << 8 + | (bytes[3] as usize) +} + +#[async_trait::async_trait] +pub trait AsyncReader { + async fn read_exact(&mut self, buf: &mut [u8]) -> io::Result; +} + +#[async_trait::async_trait] +impl AsyncReader for OwnedReadHalf { + async fn read_exact(&mut self, buf: &mut [u8]) -> io::Result { + AsyncReadExt::read_exact(self, buf).await + } +} + +#[derive(Debug, PartialEq)] +pub enum Msg { + Get(Bytes), + Set(Bytes, Bytes), +} + +#[derive(Debug)] +pub enum Error { + IO(std::io::Error), + Incomplete, +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::IO(value) + } +} + +pub struct Producer { + reader: R, +} + +impl Producer { + pub fn new(reader: R) -> Self { + Producer { + reader, + } + } + async fn next_chunk(&mut self, size: usize) -> Result { + let mut buf = vec![0u8; size]; + self.reader.read_exact(&mut buf).await?; + Ok(buf) + } + pub async fn next_msg(&mut self) -> Result { + let msg_type = self.next_chunk(1).await?; + let key_len = self.next_chunk(1).await?; + let key = self.next_chunk(key_len[0].into()).await?; + if msg_type[0] == 0 { + return Ok(Msg::Get(key)); + } + let data_len = self.next_chunk(4).await?; + let data = self + .next_chunk(four_bytes_to_usize(&data_len)) + .await?; + Ok(Msg::Set(key, data)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct MockReader { + parts: Vec>, + remainder: Vec, + } + + impl MockReader { + fn new(parts: Vec>) -> Self { + MockReader { + parts, + remainder: vec![], + } + } + } + + #[async_trait::async_trait] + impl AsyncReader for MockReader { + async fn read_exact(&mut self, _buf: &mut [u8]) -> io::Result { + println!("{:?} {:?}", self.parts, self.remainder); + todo!(); + } + } + + #[tokio::test] + async fn test_get() { + let mock_reader = MockReader::new( + vec![vec![0, 2, 1, 1]], + ); + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap(), Msg::Get(vec![1, 1])) + } + + #[tokio::test] + async fn test_get_with_partitions() { + let mock_reader = MockReader::new(vec![vec![0], vec![3, 1], vec![2, 3]]); + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap(), Msg::Get(vec![1, 2, 3])) + } + + #[tokio::test] + async fn test_set() { + let mock_reader = MockReader::new( + vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]], + ); + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap(), Msg::Set(vec![1], vec![8, 8, 8,])) + } +}