From 6115a661a87874cc0f7ea32b7ee602313a79aec9 Mon Sep 17 00:00:00 2001 From: immanelg <119798691+immanelg@users.noreply.github.com> Date: Sat, 6 Jan 2024 15:08:03 +0300 Subject: [PATCH 1/5] init protocol package --- .gitignore | 3 ++- memcrab-protocol/Cargo.toml | 8 ++++++++ memcrab-protocol/src/lib.rs | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 memcrab-protocol/Cargo.toml create mode 100644 memcrab-protocol/src/lib.rs diff --git a/.gitignore b/.gitignore index 6dee292..99f3cf8 100644 --- a/.gitignore +++ b/.gitignore @@ -201,4 +201,5 @@ Cargo.lock !memcrab !memcrab-cli !memcrab-cache -!memcrab-server \ No newline at end of file +!memcrab-server +!memcrab-protocol diff --git a/memcrab-protocol/Cargo.toml b/memcrab-protocol/Cargo.toml new file mode 100644 index 0000000..0731793 --- /dev/null +++ b/memcrab-protocol/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "memcrab-protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs new file mode 100644 index 0000000..7d12d9a --- /dev/null +++ b/memcrab-protocol/src/lib.rs @@ -0,0 +1,14 @@ +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} From 86851c5e44dcd9ad2eef3a88060062932fcb0ff3 Mon Sep 17 00:00:00 2001 From: immanelg <119798691+immanelg@users.noreply.github.com> Date: Sat, 6 Jan 2024 17:01:15 +0300 Subject: [PATCH 2/5] draft implementation --- memcrab-protocol/Cargo.toml | 2 + memcrab-protocol/src/lib.rs | 149 ++++++++++++++++++++++++++++++++++-- 2 files changed, 145 insertions(+), 6 deletions(-) diff --git a/memcrab-protocol/Cargo.toml b/memcrab-protocol/Cargo.toml index 0731793..1e3864d 100644 --- a/memcrab-protocol/Cargo.toml +++ b/memcrab-protocol/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.77" +tokio = { version = "1.35.1", features = ["full"] } diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs index 7d12d9a..ed3d482 100644 --- a/memcrab-protocol/src/lib.rs +++ b/memcrab-protocol/src/lib.rs @@ -1,14 +1,151 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right +use std::io; +use tokio::net::tcp::OwnedReadHalf; + +type Bytes = Vec; + +fn four_bytes_to_usize(bytes: &Bytes) -> usize { + debug_assert_eq!(bytes.len(), 4, "Are you stupid?"); + + (bytes[0] as usize) << 24 + | (bytes[1] as usize) << 16 + | (bytes[2] as usize) << 8 + | (bytes[3] as usize) +} + +use tokio::io::Result; + +#[async_trait::async_trait] +pub trait AsyncReader { + async fn read(&mut self, buf: &mut [u8]) -> Result; +} + +#[async_trait::async_trait] +impl AsyncReader for OwnedReadHalf { + async fn read(&mut self, buf: &mut [u8]) -> Result { + self.readable().await?; + self.try_read(buf) + } +} + +pub struct Consumer { + buffer: Bytes, + len: usize, +} + +impl Consumer { + fn new() -> Consumer { + Consumer { + buffer: Vec::new(), + len: 0, + } + } + + fn set_len(&mut self, length: usize) { + self.len = length; + } + + fn build(&mut self) -> Option { + if self.buffer.len() >= self.len { + Some(self.buffer.drain(0..self.len).collect()) + } else { + None + } + } + + fn consume(&mut self, bytes: &mut [u8]) { + self.buffer.append(&mut bytes.to_vec()); + } +} + +#[derive(Debug, PartialEq)] +pub enum Msg { + Get(Bytes), + Set(Bytes, Bytes), +} + +pub struct Producer { + reader: R, + consumer: Consumer, +} + +impl Producer { + pub fn new(reader: R) -> Self { + Producer { + reader, + consumer: Consumer::new(), + } + } + async fn next_chunk(&mut self, size: usize) -> io::Result> { + self.consumer.set_len(size); + loop { + let mut buf = [0; 128]; + + match self.reader.read(&mut buf).await { + Ok(0) => return Ok(self.consumer.build()), + Ok(n) => { + self.consumer.consume(&mut buf[0..n]); + + if let Some(bytes) = self.consumer.build() { + println!("new chunk produced: {:?}", bytes); + return Ok(Some(bytes)); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + pub async fn next_msg(&mut self) -> io::Result> { + let msg_type = self.next_chunk(1).await?.unwrap(); + let key_len = self.next_chunk(1).await?.unwrap(); + let key = self.next_chunk(key_len[0].into()).await?.unwrap(); + if msg_type[0] == 0 { + return Ok(Some(Msg::Get(key))); + } + let data_len = self.next_chunk(4).await?.unwrap(); + let data = self + .next_chunk(four_bytes_to_usize(&data_len)) + .await? + .unwrap(); + Ok(Some(Msg::Set(key, data))) + } } #[cfg(test)] mod tests { use super::*; - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); + struct MockReader { + data: Vec>, + } + + #[async_trait::async_trait] + impl AsyncReader for MockReader { + async fn read(&mut self, buf: &mut [u8]) -> Result { + if !self.data.is_empty() { + let part = self.data.remove(0); + for (i, b) in part.iter().enumerate() { + buf[i] = *b; + } + Ok(100) + } else { + Ok(0) + } + } + } + + #[tokio::test] + async fn test_set() { + let mock_reader = MockReader { + data: vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), + }; + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap().unwrap(), Msg::Set(vec![1], vec![8, 8, 8,])) } } From c94e19af56a7875862d43e5443f10332e2fa679b Mon Sep 17 00:00:00 2001 From: immanelg <119798691+immanelg@users.noreply.github.com> Date: Sat, 6 Jan 2024 18:59:57 +0300 Subject: [PATCH 3/5] add test for get + use read() from AsyncReadExt --- memcrab-protocol/src/lib.rs | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs index ed3d482..c987a37 100644 --- a/memcrab-protocol/src/lib.rs +++ b/memcrab-protocol/src/lib.rs @@ -1,5 +1,6 @@ use std::io; use tokio::net::tcp::OwnedReadHalf; +use tokio::io::AsyncReadExt; type Bytes = Vec; @@ -22,8 +23,7 @@ pub trait AsyncReader { #[async_trait::async_trait] impl AsyncReader for OwnedReadHalf { async fn read(&mut self, buf: &mut [u8]) -> Result { - self.readable().await?; - self.try_read(buf) + AsyncReadExt::read(self, buf).await } } @@ -120,28 +120,50 @@ mod tests { use super::*; struct MockReader { - data: Vec>, + parts: Vec>, } #[async_trait::async_trait] impl AsyncReader for MockReader { async fn read(&mut self, buf: &mut [u8]) -> Result { - if !self.data.is_empty() { - let part = self.data.remove(0); + if !self.parts.is_empty() { + let part = self.parts.remove(0); for (i, b) in part.iter().enumerate() { buf[i] = *b; } - Ok(100) + Ok(part.len()) } else { Ok(0) } } } + #[tokio::test] + async fn test_get() { + let mock_reader = MockReader { + parts: vec![vec![0, 2, 1, 1].into()], + }; + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap().unwrap(), Msg::Get(vec![1, 1])) + } + + #[tokio::test] + async fn test_get_with_partitions() { + let mock_reader = MockReader { + parts: vec![vec![0].into(), vec![3, 1].into(), vec![2, 3].into()], + }; + let mut producer = Producer::new(mock_reader); + + let msg = producer.next_msg().await; + assert_eq!(msg.unwrap().unwrap(), Msg::Get(vec![1, 2, 3])) + } + #[tokio::test] async fn test_set() { let mock_reader = MockReader { - data: vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), + parts: vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), }; let mut producer = Producer::new(mock_reader); From eb5343d7c41395f972510a2de0150c0e76d14664 Mon Sep 17 00:00:00 2001 From: immanelg <119798691+immanelg@users.noreply.github.com> Date: Sat, 6 Jan 2024 20:41:56 +0300 Subject: [PATCH 4/5] use read_exact --- memcrab-protocol/src/lib.rs | 143 +++++++++++++----------------------- 1 file changed, 51 insertions(+), 92 deletions(-) diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs index c987a37..c113c0d 100644 --- a/memcrab-protocol/src/lib.rs +++ b/memcrab-protocol/src/lib.rs @@ -1,6 +1,6 @@ use std::io; -use tokio::net::tcp::OwnedReadHalf; use tokio::io::AsyncReadExt; +use tokio::net::tcp::OwnedReadHalf; type Bytes = Vec; @@ -13,47 +13,15 @@ fn four_bytes_to_usize(bytes: &Bytes) -> usize { | (bytes[3] as usize) } -use tokio::io::Result; - #[async_trait::async_trait] pub trait AsyncReader { - async fn read(&mut self, buf: &mut [u8]) -> Result; + async fn read_exact(&mut self, buf: &mut [u8]) -> io::Result; } #[async_trait::async_trait] impl AsyncReader for OwnedReadHalf { - async fn read(&mut self, buf: &mut [u8]) -> Result { - AsyncReadExt::read(self, buf).await - } -} - -pub struct Consumer { - buffer: Bytes, - len: usize, -} - -impl Consumer { - fn new() -> Consumer { - Consumer { - buffer: Vec::new(), - len: 0, - } - } - - fn set_len(&mut self, length: usize) { - self.len = length; - } - - fn build(&mut self) -> Option { - if self.buffer.len() >= self.len { - Some(self.buffer.drain(0..self.len).collect()) - } else { - None - } - } - - fn consume(&mut self, bytes: &mut [u8]) { - self.buffer.append(&mut bytes.to_vec()); + async fn read_exact(&mut self, buf: &mut [u8]) -> io::Result { + AsyncReadExt::read_exact(self, buf).await } } @@ -63,55 +31,45 @@ pub enum Msg { Set(Bytes, Bytes), } +#[derive(Debug)] +pub enum Error { + IO(std::io::Error), + Incomplete, +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::IO(value) + } +} + pub struct Producer { reader: R, - consumer: Consumer, } impl Producer { pub fn new(reader: R) -> Self { Producer { reader, - consumer: Consumer::new(), } } - async fn next_chunk(&mut self, size: usize) -> io::Result> { - self.consumer.set_len(size); - loop { - let mut buf = [0; 128]; - - match self.reader.read(&mut buf).await { - Ok(0) => return Ok(self.consumer.build()), - Ok(n) => { - self.consumer.consume(&mut buf[0..n]); - - if let Some(bytes) = self.consumer.build() { - println!("new chunk produced: {:?}", bytes); - return Ok(Some(bytes)); - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e); - } - } - } + async fn next_chunk(&mut self, size: usize) -> Result { + let mut buf = vec![0u8; size]; + self.reader.read_exact(&mut buf).await?; + Ok(buf) } - pub async fn next_msg(&mut self) -> io::Result> { - let msg_type = self.next_chunk(1).await?.unwrap(); - let key_len = self.next_chunk(1).await?.unwrap(); - let key = self.next_chunk(key_len[0].into()).await?.unwrap(); + pub async fn next_msg(&mut self) -> Result { + let msg_type = self.next_chunk(1).await?; + let key_len = self.next_chunk(1).await?; + let key = self.next_chunk(key_len[0].into()).await?; if msg_type[0] == 0 { - return Ok(Some(Msg::Get(key))); + return Ok(Msg::Get(key)); } - let data_len = self.next_chunk(4).await?.unwrap(); + let data_len = self.next_chunk(4).await?; let data = self .next_chunk(four_bytes_to_usize(&data_len)) - .await? - .unwrap(); - Ok(Some(Msg::Set(key, data))) + .await?; + Ok(Msg::Set(key, data)) } } @@ -121,53 +79,54 @@ mod tests { struct MockReader { parts: Vec>, + remainder: Vec, + } + + impl MockReader { + fn new(parts: Vec>) -> Self { + MockReader { + parts, + remainder: vec![], + } + } } #[async_trait::async_trait] impl AsyncReader for MockReader { - async fn read(&mut self, buf: &mut [u8]) -> Result { - if !self.parts.is_empty() { - let part = self.parts.remove(0); - for (i, b) in part.iter().enumerate() { - buf[i] = *b; - } - Ok(part.len()) - } else { - Ok(0) - } + async fn read_exact(&mut self, _buf: &mut [u8]) -> io::Result { + println!("{:?} {:?}", self.parts, self.remainder); + todo!(); } } #[tokio::test] async fn test_get() { - let mock_reader = MockReader { - parts: vec![vec![0, 2, 1, 1].into()], - }; + let mock_reader = MockReader::new( + vec![vec![0, 2, 1, 1].into()], + ); let mut producer = Producer::new(mock_reader); let msg = producer.next_msg().await; - assert_eq!(msg.unwrap().unwrap(), Msg::Get(vec![1, 1])) + assert_eq!(msg.unwrap(), Msg::Get(vec![1, 1])) } #[tokio::test] async fn test_get_with_partitions() { - let mock_reader = MockReader { - parts: vec![vec![0].into(), vec![3, 1].into(), vec![2, 3].into()], - }; + let mock_reader = MockReader::new(vec![vec![0].into(), vec![3, 1].into(), vec![2, 3].into()]); let mut producer = Producer::new(mock_reader); let msg = producer.next_msg().await; - assert_eq!(msg.unwrap().unwrap(), Msg::Get(vec![1, 2, 3])) + assert_eq!(msg.unwrap(), Msg::Get(vec![1, 2, 3])) } #[tokio::test] async fn test_set() { - let mock_reader = MockReader { - parts: vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), - }; + let mock_reader = MockReader::new( + vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), + ); let mut producer = Producer::new(mock_reader); let msg = producer.next_msg().await; - assert_eq!(msg.unwrap().unwrap(), Msg::Set(vec![1], vec![8, 8, 8,])) + assert_eq!(msg.unwrap(), Msg::Set(vec![1], vec![8, 8, 8,])) } } From af7d1fdce62a5ddb109550569a291d3e286a2fba Mon Sep 17 00:00:00 2001 From: cospectrum Date: Sat, 6 Jan 2024 21:44:49 +0300 Subject: [PATCH 5/5] refactor --- memcrab-protocol/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs index c113c0d..2a205e2 100644 --- a/memcrab-protocol/src/lib.rs +++ b/memcrab-protocol/src/lib.rs @@ -4,8 +4,8 @@ use tokio::net::tcp::OwnedReadHalf; type Bytes = Vec; -fn four_bytes_to_usize(bytes: &Bytes) -> usize { - debug_assert_eq!(bytes.len(), 4, "Are you stupid?"); +fn four_bytes_to_usize(bytes: &[u8]) -> usize { + assert_eq!(bytes.len(), 4); (bytes[0] as usize) << 24 | (bytes[1] as usize) << 16 @@ -102,7 +102,7 @@ mod tests { #[tokio::test] async fn test_get() { let mock_reader = MockReader::new( - vec![vec![0, 2, 1, 1].into()], + vec![vec![0, 2, 1, 1]], ); let mut producer = Producer::new(mock_reader); @@ -112,7 +112,7 @@ mod tests { #[tokio::test] async fn test_get_with_partitions() { - let mock_reader = MockReader::new(vec![vec![0].into(), vec![3, 1].into(), vec![2, 3].into()]); + let mock_reader = MockReader::new(vec![vec![0], vec![3, 1], vec![2, 3]]); let mut producer = Producer::new(mock_reader); let msg = producer.next_msg().await; @@ -122,7 +122,7 @@ mod tests { #[tokio::test] async fn test_set() { let mock_reader = MockReader::new( - vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]].into(), + vec![vec![1, 1, 1, 0, 0, 0, 3, 8, 8, 8]], ); let mut producer = Producer::new(mock_reader);