From 39a88f1c8cb16f18f24047d8a2f86151cc62eefc Mon Sep 17 00:00:00 2001 From: Alex Severin Date: Mon, 22 Jan 2024 20:13:01 +0300 Subject: [PATCH 1/2] add server-mvp --- Cargo.toml | 2 +- memcrab-cache/Cargo.toml | 9 -- memcrab-cache/src/lib.rs | 3 - memcrab-cache/src/mem_lru/builder.rs | 73 --------------- memcrab-cache/src/mem_lru/mod.rs | 6 -- memcrab-cache/tests/memlru.rs | 22 ----- memcrab-cli/Cargo.toml | 1 - memcrab-protocol/README.md | 19 ++-- memcrab-protocol/src/alias.rs | 1 - memcrab-protocol/src/kind.rs | 11 ++- memcrab-protocol/src/lib.rs | 2 - memcrab-protocol/src/msg.rs | 3 +- memcrab-protocol/src/parser.rs | 7 +- memcrab-protocol/src/socket.rs | 5 -- memcrab-server/Cargo.toml | 13 +++ memcrab-server/README.md | 18 ++++ memcrab-server/src/cache/api/cfg.rs | 48 ++++++++++ memcrab-server/src/cache/api/mod.rs | 87 ++++++++++++++++++ memcrab-server/src/cache/api/value.rs | 50 +++++++++++ .../src/cache}/bytesized.rs | 0 memcrab-server/src/cache/map.rs | 84 +++++++++++++++++ .../src/cache/mem_lru.rs | 35 ++++++-- memcrab-server/src/cache/mod.rs | 10 +++ memcrab-server/src/lib.rs | 90 +++++++++++++++++++ memcrab-server/src/serve/err.rs | 11 +++ memcrab-server/src/serve/mod.rs | 7 ++ memcrab-server/src/serve/server.rs | 76 ++++++++++++++++ memcrab-server/src/serve/socket.rs | 30 +++++++ 28 files changed, 565 insertions(+), 158 deletions(-) delete mode 100644 memcrab-cache/Cargo.toml delete mode 100644 memcrab-cache/src/lib.rs delete mode 100644 memcrab-cache/src/mem_lru/builder.rs delete mode 100644 memcrab-cache/src/mem_lru/mod.rs delete mode 100644 memcrab-cache/tests/memlru.rs create mode 100644 memcrab-server/Cargo.toml create mode 100644 memcrab-server/README.md create mode 100644 memcrab-server/src/cache/api/cfg.rs create mode 100644 memcrab-server/src/cache/api/mod.rs create mode 100644 memcrab-server/src/cache/api/value.rs rename {memcrab-cache/src/mem_lru => memcrab-server/src/cache}/bytesized.rs (100%) create mode 100644 memcrab-server/src/cache/map.rs rename memcrab-cache/src/mem_lru/container.rs => memcrab-server/src/cache/mem_lru.rs (74%) create mode 100644 memcrab-server/src/cache/mod.rs create mode 100644 memcrab-server/src/lib.rs create mode 100644 memcrab-server/src/serve/err.rs create mode 100644 memcrab-server/src/serve/mod.rs create mode 100644 memcrab-server/src/serve/server.rs create mode 100644 memcrab-server/src/serve/socket.rs diff --git a/Cargo.toml b/Cargo.toml index 5a2a2ea..7e6c50c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = [ "memcrab", - "memcrab-cache", + "memcrab-server", "memcrab-protocol", "memcrab-cli", ] diff --git a/memcrab-cache/Cargo.toml b/memcrab-cache/Cargo.toml deleted file mode 100644 index 3f23e30..0000000 --- a/memcrab-cache/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "memcrab-cache" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -lru = "0.12.1" diff --git a/memcrab-cache/src/lib.rs b/memcrab-cache/src/lib.rs deleted file mode 100644 index ef30ffa..0000000 --- a/memcrab-cache/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod mem_lru; - -pub use mem_lru::{ByteSized, MemLru}; diff --git a/memcrab-cache/src/mem_lru/builder.rs b/memcrab-cache/src/mem_lru/builder.rs deleted file mode 100644 index 6bbf1da..0000000 --- a/memcrab-cache/src/mem_lru/builder.rs +++ /dev/null @@ -1,73 +0,0 @@ -use super::ByteSized; -use crate::MemLru; -use core::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; -use lru::LruCache; - -#[derive(Clone)] -pub struct UnboundedLen; -#[derive(Clone)] -pub struct NoMaxByteSize; - -pub type DefaultBuilder = MemLruBuilder; - -#[derive(Clone)] -pub struct MemLruBuilder { - max_len: MaxLen, - max_bytesize: MaxByteSize, - phantom: PhantomData<(K, V)>, -} - -impl DefaultBuilder { - pub(crate) fn new() -> Self { - Self { - max_len: UnboundedLen, - max_bytesize: NoMaxByteSize, - phantom: PhantomData, - } - } -} - -impl MemLruBuilder { - pub fn max_len(self, max_len: usize) -> MemLruBuilder { - assert!(max_len > 0); - MemLruBuilder { - max_len, - max_bytesize: self.max_bytesize, - phantom: PhantomData, - } - } -} - -impl MemLruBuilder { - pub fn max_bytesize(self, max_bytesize: usize) -> MemLruBuilder { - MemLruBuilder { - max_len: self.max_len, - max_bytesize, - phantom: PhantomData, - } - } -} - -impl MemLruBuilder -where - K: Hash + Eq + ByteSized, - V: ByteSized, -{ - pub fn build(self) -> MemLru { - assert!(self.max_len > 0); - let cap = NonZeroUsize::new(self.max_len).unwrap(); - let lru = LruCache::new(cap); - MemLru::new(lru, self.max_bytesize) - } -} - -impl MemLruBuilder -where - K: Hash + Eq + ByteSized, - V: ByteSized, -{ - pub fn build(self) -> MemLru { - let lru = LruCache::unbounded(); - MemLru::new(lru, self.max_bytesize) - } -} diff --git a/memcrab-cache/src/mem_lru/mod.rs b/memcrab-cache/src/mem_lru/mod.rs deleted file mode 100644 index 23c2d43..0000000 --- a/memcrab-cache/src/mem_lru/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod builder; -mod bytesized; -mod container; - -pub use bytesized::ByteSized; -pub use container::MemLru; diff --git a/memcrab-cache/tests/memlru.rs b/memcrab-cache/tests/memlru.rs deleted file mode 100644 index 829063c..0000000 --- a/memcrab-cache/tests/memlru.rs +++ /dev/null @@ -1,22 +0,0 @@ -use memcrab_cache::MemLru; - -#[test] -fn builder() { - let max_len = 10; - let max_bytesize = 50; - - let memlru = MemLru::>::builder() - .max_len(max_len) - .max_bytesize(max_bytesize) - .build(); - - assert_eq!(memlru.max_bytesize(), max_bytesize); - assert_eq!(memlru.max_len(), max_len); - - let memlru = MemLru::>::builder() - .max_bytesize(max_bytesize) - .build(); - - assert_eq!(memlru.max_bytesize(), max_bytesize); - assert_eq!(memlru.max_len(), usize::MAX); -} diff --git a/memcrab-cli/Cargo.toml b/memcrab-cli/Cargo.toml index 96ce84c..b20752f 100644 --- a/memcrab-cli/Cargo.toml +++ b/memcrab-cli/Cargo.toml @@ -14,6 +14,5 @@ repository = "https://github.com/cospectrum/memcrab" anyhow = "1.0.79" clap = { version = "4.4.12", features = ["derive"] } memcrab = { version = "0.1.0", path = "../memcrab" } -memcrab-cache = { version = "0.1.0", path = "../memcrab-cache" } rustyline = "13.0.0" tokio = { version = "1.35.1", features = ["full"] } diff --git a/memcrab-protocol/README.md b/memcrab-protocol/README.md index 860dd86..f19206a 100644 --- a/memcrab-protocol/README.md +++ b/memcrab-protocol/README.md @@ -41,7 +41,6 @@ Clients should only send request messages and understand responses messages howe ```rs type PayloadLen = u64; // number of bytes in payload -type Version = u16; // protocol-version type KeyLen = u64; // number of bytes in the encoded utf8 string key type Expirtaion = u32; // expiration in seconds @@ -54,12 +53,11 @@ type Key = String; // utf-8 #### Requests (first byte < 128) | Message kind | first byte | remaining 8 bytes in header | payload | --- | --- | --- | --- -| Version | 0 | PayloadLen | Version -| Ping | 1 | zeros | none -| Get | 2 | PayloadLen | Key -| Set | 3 | PayloadLen | KeyLen, Expirtaion, Key, Value -| Delete | 4 | PayloadLen | Key -| Clear | 5 | zeros | none +| Ping | 0 | zeros | none +| Get | 1 | PayloadLen | Key +| Set | 2 | PayloadLen | KeyLen, Expirtaion, Key, Value +| Delete | 3 | PayloadLen | Key +| Clear | 4 | zeros | none #### Responses (first byte >= 128) | Message kind | first byte | remaining 8 bytes in header | payload @@ -70,10 +68,3 @@ type Key = String; // utf-8 | KeyNotFound | 131 | zeros | none | Error | 255 | PayloadLen | String (utf-8 encoded) -### Versioning -Protocol is versioned by a number and are not backwards compatible. - -The current version is `0`. - -The clients must send `Version` message as their first message. -The server must close the connection if the version is not compatible. diff --git a/memcrab-protocol/src/alias.rs b/memcrab-protocol/src/alias.rs index 8776659..315140e 100644 --- a/memcrab-protocol/src/alias.rs +++ b/memcrab-protocol/src/alias.rs @@ -1,5 +1,4 @@ pub type PayloadLen = u64; -pub type Version = u16; pub type KeyLen = u64; pub type Expiration = u32; diff --git a/memcrab-protocol/src/kind.rs b/memcrab-protocol/src/kind.rs index fc43d8b..764ddc9 100644 --- a/memcrab-protocol/src/kind.rs +++ b/memcrab-protocol/src/kind.rs @@ -33,12 +33,11 @@ impl TryFrom for MsgKind { #[repr(u8)] #[derive(Debug, Clone, Copy, TryFromPrimitive, IntoPrimitive, PartialEq, Eq)] pub enum RequestKind { - Version = 0, - Ping = 1, - Get = 2, - Set = 3, - Delete = 4, - Clear = 5, + Ping = 0, + Get = 1, + Set = 2, + Delete = 3, + Clear = 4, } #[repr(u8)] diff --git a/memcrab-protocol/src/lib.rs b/memcrab-protocol/src/lib.rs index 1e82263..466f5fc 100644 --- a/memcrab-protocol/src/lib.rs +++ b/memcrab-protocol/src/lib.rs @@ -8,7 +8,6 @@ mod socket; use std::mem::size_of; -use alias::Version; use parser::Parser; pub use err::{Error, ParseError}; @@ -17,7 +16,6 @@ pub use msg::{Msg, Request, Response}; pub use socket::Socket; const HEADER_SIZE: usize = size_of::() + size_of::(); -pub const VERSION: Version = 0; #[cfg(test)] mod tests { diff --git a/memcrab-protocol/src/msg.rs b/memcrab-protocol/src/msg.rs index a988ca2..eeac8a4 100644 --- a/memcrab-protocol/src/msg.rs +++ b/memcrab-protocol/src/msg.rs @@ -1,4 +1,4 @@ -use crate::alias::{Expiration, Version}; +use crate::alias::Expiration; #[derive(Debug, Clone, PartialEq)] pub enum Msg { @@ -8,7 +8,6 @@ pub enum Msg { #[derive(Debug, Clone, PartialEq)] pub enum Request { - Version(Version), Get(String), Set { key: String, diff --git a/memcrab-protocol/src/parser.rs b/memcrab-protocol/src/parser.rs index 520975b..c6197e5 100644 --- a/memcrab-protocol/src/parser.rs +++ b/memcrab-protocol/src/parser.rs @@ -1,5 +1,5 @@ use crate::{ - alias::{Expiration, KeyLen, PayloadLen, Version}, + alias::{Expiration, KeyLen, PayloadLen}, kind::{MsgKind, RequestKind, ResponseKind}, Msg, ParseError, Request, Response, HEADER_SIZE, }; @@ -32,10 +32,6 @@ impl Parser { fn decode_request(&self, kind: RequestKind, payload: Payload) -> Result { use RequestKind as Kind; Ok(match kind { - Kind::Version => { - let version = Version::from_be_bytes(payload.as_slice().try_into()?); - Request::Version(version) - } Kind::Ping => Request::Ping, Kind::Get => Request::Get(utf8(payload)?), Kind::Set => { @@ -92,7 +88,6 @@ impl Parser { fn encode_request(&self, req: Request) -> (RequestKind, Payload) { match req { - Request::Version(version) => (RequestKind::Version, version.to_be_bytes().to_vec()), Request::Ping => (RequestKind::Ping, vec![]), Request::Clear => (RequestKind::Clear, vec![]), Request::Get(key) => (RequestKind::Get, key.into()), diff --git a/memcrab-protocol/src/socket.rs b/memcrab-protocol/src/socket.rs index 3ee839e..e6838b5 100644 --- a/memcrab-protocol/src/socket.rs +++ b/memcrab-protocol/src/socket.rs @@ -101,11 +101,6 @@ mod test { // boilerplate somehow // TODO!: tests for encoding - let mut data = vec![MsgKind::Request(RequestKind::Version).into()]; - data.extend(2u64.to_be_bytes()); - data.extend([0, 1]); - assert_parsed(data, Msg::Request(Request::Version(1))).await; - let mut data = vec![MsgKind::Request(RequestKind::Ping).into()]; data.extend(zero_u64_bytes); assert_parsed(data, Msg::Request(Request::Ping)).await; diff --git a/memcrab-server/Cargo.toml b/memcrab-server/Cargo.toml new file mode 100644 index 0000000..2c7f8a7 --- /dev/null +++ b/memcrab-server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "memcrab-server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { workspace = true, features = ["full"] } +memcrab-protocol = { version = "0.1.0", path = "../memcrab-protocol" } +lru = "0.12.1" +thiserror.workspace = true +typed-builder = "0.18.1" diff --git a/memcrab-server/README.md b/memcrab-server/README.md new file mode 100644 index 0000000..fc6c4ed --- /dev/null +++ b/memcrab-server/README.md @@ -0,0 +1,18 @@ +# memcrab-server + +```rs +use memcrab_server::{Server, CacheCfg}; + +#[tokio::main] +async fn main() { + let gb = 2_usize.pow(30); + let cfg = CacheCfg::builder() + .segments(10) + .max_bytesize(gb) + .build(); + + let addr = "127.0.0.1:9900".parse().unwrap(); + let server = Server::from(cfg); + server.start(addr).await.unwrap(); +} +``` diff --git a/memcrab-server/src/cache/api/cfg.rs b/memcrab-server/src/cache/api/cfg.rs new file mode 100644 index 0000000..706a7e1 --- /dev/null +++ b/memcrab-server/src/cache/api/cfg.rs @@ -0,0 +1,48 @@ +use std::sync::Mutex; +use typed_builder::TypedBuilder; + +use crate::cache::map::Map; + +use super::{MemLru, Value}; + +#[derive(TypedBuilder)] +pub struct CacheCfg { + segments: usize, + #[builder(default=None, setter(strip_option))] + max_len: Option, + max_bytesize: usize, +} + +impl CacheCfg { + pub(super) fn map(self) -> Map { + assert!(self.segments > 0); + + let new_segment = |max_bytesize: usize, max_len: Option| { + Mutex::new(match max_len { + Some(max_len) => { + if max_len == 0 { + MemLru::with_max_bytesize(max_bytesize) + } else { + MemLru::with_max_bytesize_and_max_len(max_bytesize, max_len) + } + } + None => MemLru::with_max_bytesize(max_bytesize), + }) + }; + + let mut segments = Vec::with_capacity(self.segments); + let segment = new_segment( + self.max_bytesize / self.segments + self.max_bytesize % self.segments, + self.max_len.map(|l| l / self.segments + l % self.segments), + ); + segments.push(segment); + for _ in 1..self.segments { + let segment = new_segment( + self.max_bytesize / self.segments, + self.max_len.map(|l| l / self.segments), + ); + segments.push(segment); + } + Map::from_segments(segments) + } +} diff --git a/memcrab-server/src/cache/api/mod.rs b/memcrab-server/src/cache/api/mod.rs new file mode 100644 index 0000000..25509bd --- /dev/null +++ b/memcrab-server/src/cache/api/mod.rs @@ -0,0 +1,87 @@ +mod cfg; +mod value; + +use std::num::NonZeroU32; + +use super::{ByteSized, Map, MemLru}; +use value::Value; + +pub use cfg::CacheCfg; + +pub struct Cache { + inner: Map, +} + +impl Cache { + pub fn new(inner: Map) -> Self { + Cache { inner } + } +} + +impl From for Cache { + fn from(cfg: CacheCfg) -> Self { + Self::new(cfg.map()) + } +} + +impl Cache { + pub fn set(&self, key: String, value: Vec) { + self._set(key, Value::new(value)) + } + pub fn set_with_expiration(&self, key: String, value: Vec, exp: NonZeroU32) { + self._set(key, Value::with_expiration(value, exp)) + } + pub fn get(&self, key: &str) -> Option> { + self._get(key) + } + pub fn remove(&self, key: &str) -> Option> { + match self.inner.remove(key) { + Some(val) => { + if val.expired() { + None + } else { + Some(val.into_vec()) + } + } + None => None, + } + } + pub fn clear(&self) { + self.inner.clear() + } +} + +impl Cache { + fn _set(&self, key: String, value: Value) { + self.inner.set(key, value); + } + fn _get(&self, key: &str) -> Option> { + // Don't forget that the mutex is locked until the function returns. + // Accessing the same key inside will result in a deadlock. + let f = |opt: Option<&Value>| { + if let Some(val) = opt { + if val.expired() { + LazyVal::Expired + } else { + LazyVal::Val(val.clone().into_vec()) + } + } else { + LazyVal::NotFound + } + }; + match self.inner.get_and_then(key, f) { + LazyVal::Val(val) => Some(val), + LazyVal::Expired => { + self.inner.remove(key); + None + } + LazyVal::NotFound => None, + } + } +} + +enum LazyVal { + Expired, + NotFound, + Val(Vec), +} diff --git a/memcrab-server/src/cache/api/value.rs b/memcrab-server/src/cache/api/value.rs new file mode 100644 index 0000000..ce5838d --- /dev/null +++ b/memcrab-server/src/cache/api/value.rs @@ -0,0 +1,50 @@ +use super::ByteSized; +use std::{num::NonZeroU32, time::Instant}; + +#[derive(Clone, Debug)] +struct Clock { + start: Instant, + expiration: u32, +} + +impl Clock { + fn start_timing(expiration: NonZeroU32) -> Self { + let start = Instant::now(); + Self { + start, + expiration: expiration.into(), + } + } + fn expired(&self) -> bool { + let passed = self.start.elapsed().as_secs() as u32; + passed > self.expiration + } +} + +#[derive(Clone, Debug)] +pub struct Value { + inner: Vec, + clock: Option, +} + +impl Value { + pub fn new(inner: Vec) -> Self { + Self { inner, clock: None } + } + pub fn with_expiration(inner: Vec, expiration: NonZeroU32) -> Self { + let clock = Some(Clock::start_timing(expiration)); + Self { inner, clock } + } + pub fn expired(&self) -> bool { + self.clock.as_ref().map(|c| c.expired()).unwrap_or(false) + } + pub fn into_vec(self) -> Vec { + self.inner + } +} + +impl ByteSized for Value { + fn bytesize(&self) -> usize { + self.inner.bytesize() + } +} diff --git a/memcrab-cache/src/mem_lru/bytesized.rs b/memcrab-server/src/cache/bytesized.rs similarity index 100% rename from memcrab-cache/src/mem_lru/bytesized.rs rename to memcrab-server/src/cache/bytesized.rs diff --git a/memcrab-server/src/cache/map.rs b/memcrab-server/src/cache/map.rs new file mode 100644 index 0000000..8809fd6 --- /dev/null +++ b/memcrab-server/src/cache/map.rs @@ -0,0 +1,84 @@ +use super::{ByteSized, MemLru}; +use core::{borrow::Borrow, hash::Hash}; +use std::{ + collections::hash_map::RandomState, + sync::{Mutex, MutexGuard}, +}; + +type Segment = Mutex>; +type Segments = Vec>; + +pub struct Map +where + K: Hash + Eq, +{ + segments: Segments, + hasher: H, +} + +impl Map +where + K: Hash + Eq + ByteSized, + V: ByteSized + Clone, +{ + pub fn from_segments(segments: Segments) -> Self { + Self { + segments, + hasher: RandomState::default(), + } + } + pub fn set(&self, key: K, val: V) -> Option { + let mut segment = self.lock_segment_for_key(&key); + segment.set(key, val) + } + #[allow(unused)] + pub fn get(&self, key: &Q) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + let f = |opt: Option<&V>| opt.cloned(); + self.get_and_then(key, f) + } + pub fn remove(&self, key: &Q) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + let mut segment = self.lock_segment_for_key(&key); + segment.remove(key) + } + pub fn clear(&self) { + self.segments.iter().for_each(|seg| { + let mut seg = seg.lock().unwrap(); + seg.clear(); + }) + } + pub fn get_and_then(&self, key: &Q, f: F) -> T + where + K: Borrow, + Q: Hash + Eq + ?Sized, + F: FnOnce(Option<&V>) -> T, + { + let mut segment = self.lock_segment_for_key(&key); + let opt = segment.get(key); + f(opt) + } + + fn lock_segment_for_key(&self, key: &T) -> MutexGuard> { + let at = self.determine_segment(key); + self.segments[at].lock().unwrap() + } + fn determine_segment(&self, key: &T) -> usize { + let hash = self.hash(key); + let idx = hash % self.segments.len() as u64; + idx as usize + } + fn hash(&self, item: &T) -> u64 { + use core::hash::{BuildHasher, Hasher}; + + let mut hasher = self.hasher.build_hasher(); + item.hash(&mut hasher); + hasher.finish() + } +} diff --git a/memcrab-cache/src/mem_lru/container.rs b/memcrab-server/src/cache/mem_lru.rs similarity index 74% rename from memcrab-cache/src/mem_lru/container.rs rename to memcrab-server/src/cache/mem_lru.rs index fe9b99f..5709a2b 100644 --- a/memcrab-cache/src/mem_lru/container.rs +++ b/memcrab-server/src/cache/mem_lru.rs @@ -1,4 +1,4 @@ -use super::{builder::DefaultBuilder, ByteSized}; +use super::ByteSized; use core::{borrow::Borrow, hash::Hash}; use lru::LruCache; @@ -26,12 +26,20 @@ where K: ByteSized + Hash + Eq, V: ByteSized, { - pub fn builder() -> DefaultBuilder { - DefaultBuilder::new() - } - pub(crate) fn new(lru: LruCache, max_bytesize: usize) -> Self { + pub fn with_max_bytesize(max_bytesize: usize) -> Self { + let inner = LruCache::unbounded(); + Self::new(inner, max_bytesize) + } + pub fn with_max_bytesize_and_max_len(max_bytesize: usize, max_len: usize) -> Self { + use core::num::NonZeroUsize; + assert!(max_len > 0, "max_len should be > 0"); + let max_len = NonZeroUsize::new(max_len).unwrap(); + let inner = LruCache::new(max_len); + Self::new(inner, max_bytesize) + } + pub(crate) fn new(inner: LruCache, max_bytesize: usize) -> Self { Self { - inner: lru, + inner, max_bytesize, bytesize: 0, } @@ -39,12 +47,15 @@ where pub fn max_bytesize(&self) -> usize { self.max_bytesize } + #[allow(unused)] pub fn max_len(&self) -> usize { self.inner.cap().into() } + #[allow(unused)] pub fn len(&self) -> usize { self.inner.len() } + #[allow(unused)] pub fn is_empty(&self) -> bool { self.inner.is_empty() } @@ -72,6 +83,17 @@ where assert!(self.bytesize() <= self.max_bytesize()); result } + pub fn remove(&mut self, key: &Q) -> Option + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.pop(key) + } + pub fn clear(&mut self) { + self.inner.clear(); + self.bytesize = 0; + } fn make_room_for(&mut self, item_size: usize) { assert!(item_size <= self.max_bytesize()); @@ -79,7 +101,6 @@ where self.pop_lru(); } } - fn pop(&mut self, key: &Q) -> Option where K: Borrow, diff --git a/memcrab-server/src/cache/mod.rs b/memcrab-server/src/cache/mod.rs new file mode 100644 index 0000000..71bbb2e --- /dev/null +++ b/memcrab-server/src/cache/mod.rs @@ -0,0 +1,10 @@ +mod api; +mod bytesized; +mod map; +mod mem_lru; + +use bytesized::ByteSized; +use map::Map; +use mem_lru::MemLru; + +pub use api::{Cache, CacheCfg}; diff --git a/memcrab-server/src/lib.rs b/memcrab-server/src/lib.rs new file mode 100644 index 0000000..d234fdc --- /dev/null +++ b/memcrab-server/src/lib.rs @@ -0,0 +1,90 @@ +/*! +# memcrab-server + +## Usage + +```no_run +use memcrab_server::{Server, CacheCfg}; + +#[tokio::main] +async fn main() { + let gb = 2_usize.pow(30); + let cfg = CacheCfg::builder() + .segments(10) + .max_bytesize(gb) + .build(); + + let addr = "127.0.0.1:9900".parse().unwrap(); + let server = Server::from(cfg); + server.start(addr).await.unwrap(); +} +``` +*/ + +mod cache; +mod serve; + +use cache::Cache; + +pub use cache::CacheCfg; +pub use serve::Server; + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use memcrab_protocol::{Msg, Request, Response, Socket}; + use tokio::net::TcpStream; + + use super::*; + + #[tokio::test] + async fn test_init() { + let gb = 2_usize.pow(30); + let cfg = CacheCfg::builder().segments(10).max_bytesize(gb).build(); + + let addr = "127.0.0.1:9900".parse().unwrap(); + + let server = Server::from(cfg); + tokio::spawn(async move { + server.start(addr).await.unwrap(); + }); + tokio::time::sleep(Duration::from_secs_f32(0.5)).await; + + let stream = tokio::time::timeout(Duration::from_secs_f32(0.1), TcpStream::connect(addr)) + .await + .expect("connect") + .unwrap(); + let mut socket = Socket::new(stream); + + socket.send(Msg::Request(Request::Ping)).await.unwrap(); + let msg = socket.recv().await.unwrap(); + assert_eq!(msg, Msg::Response(Response::Pong)); + + let key = "some-key".to_string(); + socket + .send(Msg::Request(Request::Get(key.clone()))) + .await + .unwrap(); + let msg = socket.recv().await.unwrap(); + assert_eq!(msg, Msg::Response(Response::KeyNotFound)); + + socket + .send(Msg::Request(Request::Set { + key: key.clone(), + value: vec![3, 4], + expiration: 0, + })) + .await + .unwrap(); + let msg = socket.recv().await.unwrap(); + assert_eq!(msg, Msg::Response(Response::Ok)); + + socket + .send(Msg::Request(Request::Get(key.clone()))) + .await + .unwrap(); + let msg = socket.recv().await.unwrap(); + assert_eq!(msg, Msg::Response(Response::Value(vec![3, 4]))); + } +} diff --git a/memcrab-server/src/serve/err.rs b/memcrab-server/src/serve/err.rs new file mode 100644 index 0000000..4f2b36c --- /dev/null +++ b/memcrab-server/src/serve/err.rs @@ -0,0 +1,11 @@ +use memcrab_protocol::Error as ProtocolError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ServerSideError { + #[error("protocol error")] + Protocol(#[from] ProtocolError), + + #[error("invalid msg")] + InvalidMsg, +} diff --git a/memcrab-server/src/serve/mod.rs b/memcrab-server/src/serve/mod.rs new file mode 100644 index 0000000..1ffef02 --- /dev/null +++ b/memcrab-server/src/serve/mod.rs @@ -0,0 +1,7 @@ +mod err; +mod server; +mod socket; + +use err::ServerSideError; + +pub use server::Server; diff --git a/memcrab-server/src/serve/server.rs b/memcrab-server/src/serve/server.rs new file mode 100644 index 0000000..0da6ef0 --- /dev/null +++ b/memcrab-server/src/serve/server.rs @@ -0,0 +1,76 @@ +use memcrab_protocol::{Request, Response, Socket}; +use std::{net::SocketAddr, num::NonZeroU32, sync::Arc}; +use tokio::net::{TcpListener, TcpStream}; + +use crate::{Cache, CacheCfg}; + +use super::socket::ServerSocket; + +pub struct Server { + cache: Cache, +} + +impl From for Server { + fn from(cfg: CacheCfg) -> Self { + Self::new(Cache::from(cfg)) + } +} + +async fn start_server(server: Server, addr: SocketAddr) -> Result<(), std::io::Error> { + let listener = TcpListener::bind(addr).await?; + let server = Arc::new(server); + loop { + let server = server.clone(); + let (socket, client_addr) = listener.accept().await?; + tokio::spawn(async move { server.handle(socket, client_addr).await }); + } +} + +// TODO: logging +impl Server { + fn new(cache: Cache) -> Self { + Self { cache } + } + pub async fn start(self, addr: SocketAddr) -> Result<(), std::io::Error> { + start_server(self, addr).await + } + // TODO: error handling + async fn handle(&self, stream: TcpStream, _: SocketAddr) { + let mut socket: ServerSocket<_> = Socket::new(stream).into(); + loop { + let request = socket.recv().await.unwrap(); + let response = self.response_to(request); + socket.send(response).await.unwrap(); + } + } + fn response_to(&self, request: Request) -> Response { + match request { + Request::Ping => Response::Pong, + Request::Get(ref key) => match self.cache.get(key) { + Some(val) => Response::Value(val), + None => Response::KeyNotFound, + }, + Request::Delete(ref key) => match self.cache.remove(key) { + Some(_) => Response::Ok, + None => Response::KeyNotFound, + }, + Request::Set { + key, + value, + expiration, + } => { + if expiration == 0 { + self.cache.set(key, value) + } else { + let exp = NonZeroU32::new(expiration).unwrap(); + self.cache.set_with_expiration(key, value, exp); + } + Response::Ok + } + Request::Clear => { + self.cache.clear(); + Response::Ok + } + } + } +} diff --git a/memcrab-server/src/serve/socket.rs b/memcrab-server/src/serve/socket.rs new file mode 100644 index 0000000..09e7c7f --- /dev/null +++ b/memcrab-server/src/serve/socket.rs @@ -0,0 +1,30 @@ +use super::ServerSideError; +use memcrab_protocol::{AsyncReader, AsyncWriter, Msg, Request, Response, Socket}; + +#[derive(Debug, Clone)] +pub struct ServerSocket { + inner: Socket, +} + +impl From> for ServerSocket { + fn from(inner: Socket) -> Self { + Self { inner } + } +} + +impl ServerSocket +where + S: AsyncReader + AsyncWriter + Send, +{ + pub async fn recv(&mut self) -> Result { + let msg = self.inner.recv().await?; + match msg { + Msg::Request(req) => Ok(req), + Msg::Response(_) => Err(ServerSideError::InvalidMsg), + } + } + pub async fn send(&mut self, response: Response) -> Result<(), ServerSideError> { + self.inner.send(Msg::Response(response)).await?; + Ok(()) + } +} From ad98773522b1556953b3589d45480e3c841541f6 Mon Sep 17 00:00:00 2001 From: Alex Severin Date: Mon, 22 Jan 2024 20:21:02 +0300 Subject: [PATCH 2/2] try to fix clippy --- memcrab-server/src/cache/map.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/memcrab-server/src/cache/map.rs b/memcrab-server/src/cache/map.rs index 8809fd6..be81eb2 100644 --- a/memcrab-server/src/cache/map.rs +++ b/memcrab-server/src/cache/map.rs @@ -75,10 +75,7 @@ where idx as usize } fn hash(&self, item: &T) -> u64 { - use core::hash::{BuildHasher, Hasher}; - - let mut hasher = self.hasher.build_hasher(); - item.hash(&mut hasher); - hasher.finish() + use core::hash::BuildHasher; + self.hasher.hash_one(item) } }