From c02325175745b9cc7075694d29f3eb5030a262ca Mon Sep 17 00:00:00 2001 From: Alex Severin Date: Wed, 3 Jan 2024 06:31:16 +0300 Subject: [PATCH] create server --- examples/protogen/Makefile.toml | 19 ++ examples/protogen/README.md | 6 +- .../proto/{grpc.proto => cache_grpc.proto} | 9 +- examples/protogen/src/main.rs | 2 +- memcrab-server/Cargo.toml | 3 + memcrab-server/README.md | 18 ++ memcrab-server/src/lib.rs | 16 ++ memcrab-server/src/pb.rs | 179 +++--------------- memcrab-server/src/service.rs | 54 ++++++ memcrab-server/tests/run.rs | 25 +++ 10 files changed, 167 insertions(+), 164 deletions(-) create mode 100644 examples/protogen/Makefile.toml rename examples/protogen/proto/{grpc.proto => cache_grpc.proto} (80%) create mode 100644 memcrab-server/README.md create mode 100644 memcrab-server/tests/run.rs diff --git a/examples/protogen/Makefile.toml b/examples/protogen/Makefile.toml new file mode 100644 index 0000000..ff1ea0c --- /dev/null +++ b/examples/protogen/Makefile.toml @@ -0,0 +1,19 @@ +[tasks.clean-server] +command = "rm" +args = ["-rf", "server"] + +[tasks.clean-client] +command = "rm" +args = ["-rf", "client"] + + +[tasks.clean] +dependencies = [ + "clean-server", + "clean-client", +] + +[tasks.all] +command = "cargo" +args = ["run"] +dependencies = ["clean"] diff --git a/examples/protogen/README.md b/examples/protogen/README.md index ef2d950..af585e1 100644 --- a/examples/protogen/README.md +++ b/examples/protogen/README.md @@ -1,5 +1,9 @@ # protogen ```sh -cargo run proto/.proto +cargo run +``` +or +```sh +cargo make all ``` diff --git a/examples/protogen/proto/grpc.proto b/examples/protogen/proto/cache_grpc.proto similarity index 80% rename from examples/protogen/proto/grpc.proto rename to examples/protogen/proto/cache_grpc.proto index aed2c92..864b00e 100644 --- a/examples/protogen/proto/grpc.proto +++ b/examples/protogen/proto/cache_grpc.proto @@ -1,12 +1,8 @@ syntax = "proto3"; +package cache_grpc; -package rpc; - -service Getter { +service CacheRPC { rpc Get(GetRequest) returns (GetResponse); -} - -service Setter { rpc Set(SetRequest) returns (SetResponse); } @@ -24,5 +20,4 @@ message SetRequest { } message SetResponse { - bool success = 1; } diff --git a/examples/protogen/src/main.rs b/examples/protogen/src/main.rs index daf27b3..56129b5 100644 --- a/examples/protogen/src/main.rs +++ b/examples/protogen/src/main.rs @@ -2,7 +2,7 @@ use std::{fs, path::Path}; fn main() { let args: Vec = std::env::args().collect(); - let default = "proto/grpc.proto".to_string(); + let default = "proto/cache_grpc.proto".to_string(); let relative_path = args.get(1).unwrap_or(&default); let curr_dir = std::env::current_dir().unwrap(); diff --git a/memcrab-server/Cargo.toml b/memcrab-server/Cargo.toml index 080d8a7..49c3c69 100644 --- a/memcrab-server/Cargo.toml +++ b/memcrab-server/Cargo.toml @@ -9,3 +9,6 @@ edition = "2021" memcrab-cache = { version = "0.1.0", path = "../memcrab-cache" } prost = { workspace = true } tonic = { workspace = true } + +[dev-dependencies] +tokio = { version = "1.35.1", features = ["full"] } diff --git a/memcrab-server/README.md b/memcrab-server/README.md new file mode 100644 index 0000000..3a3a571 --- /dev/null +++ b/memcrab-server/README.md @@ -0,0 +1,18 @@ +# memcrab-server + +```rust +use core::num::NonZeroUsize; +use memcrab_server::start_grpc_server; +use memcrab_cache::Cache; + +#[tokio::main] +async fn main() { + let addr = "[::1]:50051".parse().unwrap(); + + let maxbytes = 100_000; + let maxlen = NonZeroUsize::new(110).unwrap(); + let cache = Cache::new(maxlen, maxbytes); + + start_grpc_server(addr, cache).await.unwrap(); +} +``` diff --git a/memcrab-server/src/lib.rs b/memcrab-server/src/lib.rs index 0529330..f99f8b6 100644 --- a/memcrab-server/src/lib.rs +++ b/memcrab-server/src/lib.rs @@ -3,3 +3,19 @@ mod pb; mod service; + +use pb::cache_rpc_server::CacheRpcServer; +use service::Service; +use std::net::SocketAddr; +use tonic::transport::Server; + +pub async fn start_grpc_server( + addr: SocketAddr, + cache: memcrab_cache::Cache, +) -> Result<(), tonic::transport::Error> { + let cache_srvice = Service::new(cache); + Server::builder() + .add_service(CacheRpcServer::new(cache_srvice)) + .serve(addr) + .await +} diff --git a/memcrab-server/src/pb.rs b/memcrab-server/src/pb.rs index b2a5f7a..8479e5b 100644 --- a/memcrab-server/src/pb.rs +++ b/memcrab-server/src/pb.rs @@ -20,24 +20,25 @@ pub struct SetRequest { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct SetResponse { - #[prost(bool, tag = "1")] - pub success: bool, -} +pub struct SetResponse {} /// Generated server implementations. -pub mod getter_server { +pub mod cache_rpc_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with GetterServer. + /// Generated trait containing gRPC methods that should be implemented for use with CacheRpcServer. #[async_trait] - pub trait Getter: Send + Sync + 'static { + pub trait CacheRpc: Send + Sync + 'static { async fn get( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn set( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] - pub struct GetterServer { + pub struct CacheRpcServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, @@ -45,7 +46,7 @@ pub mod getter_server { max_encoding_message_size: Option, } struct _Inner(Arc); - impl GetterServer { + impl CacheRpcServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -97,9 +98,9 @@ pub mod getter_server { self } } - impl tonic::codegen::Service> for GetterServer + impl tonic::codegen::Service> for CacheRpcServer where - T: Getter, + T: CacheRpc, B: Body + Send + 'static, B::Error: Into + Send + 'static, { @@ -115,10 +116,10 @@ pub mod getter_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/rpc.Getter/Get" => { + "/cache_grpc.CacheRPC/Get" => { #[allow(non_camel_case_types)] - struct GetSvc(pub Arc); - impl tonic::server::UnaryService + struct GetSvc(pub Arc); + impl tonic::server::UnaryService for GetSvc { type Response = super::GetResponse; type Future = BoxFuture< @@ -131,7 +132,7 @@ pub mod getter_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get(&inner, request).await + ::get(&inner, request).await }; Box::pin(fut) } @@ -159,142 +160,10 @@ pub mod getter_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } - } - } - } - impl Clone for GetterServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - max_decoding_message_size: self.max_decoding_message_size, - max_encoding_message_size: self.max_encoding_message_size, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for GetterServer { - const NAME: &'static str = "rpc.Getter"; - } -} -/// Generated server implementations. -pub mod setter_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with SetterServer. - #[async_trait] - pub trait Setter: Send + Sync + 'static { - async fn set( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - } - #[derive(Debug)] - pub struct SetterServer { - inner: _Inner, - accept_compression_encodings: EnabledCompressionEncodings, - send_compression_encodings: EnabledCompressionEncodings, - max_decoding_message_size: Option, - max_encoding_message_size: Option, - } - struct _Inner(Arc); - impl SetterServer { - pub fn new(inner: T) -> Self { - Self::from_arc(Arc::new(inner)) - } - pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - max_decoding_message_size: None, - max_encoding_message_size: None, - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - /// Enable decompressing requests with the given encoding. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.accept_compression_encodings.enable(encoding); - self - } - /// Compress responses with the given encoding, if the client supports it. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.send_compression_encodings.enable(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.max_decoding_message_size = Some(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.max_encoding_message_size = Some(limit); - self - } - } - impl tonic::codegen::Service> for SetterServer - where - T: Setter, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = std::convert::Infallible; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/rpc.Setter/Set" => { + "/cache_grpc.CacheRPC/Set" => { #[allow(non_camel_case_types)] - struct SetSvc(pub Arc); - impl tonic::server::UnaryService + struct SetSvc(pub Arc); + impl tonic::server::UnaryService for SetSvc { type Response = super::SetResponse; type Future = BoxFuture< @@ -307,7 +176,7 @@ pub mod setter_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::set(&inner, request).await + ::set(&inner, request).await }; Box::pin(fut) } @@ -350,7 +219,7 @@ pub mod setter_server { } } } - impl Clone for SetterServer { + impl Clone for CacheRpcServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -362,7 +231,7 @@ pub mod setter_server { } } } - impl Clone for _Inner { + impl Clone for _Inner { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } @@ -372,7 +241,7 @@ pub mod setter_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for SetterServer { - const NAME: &'static str = "rpc.Setter"; + impl tonic::server::NamedService for CacheRpcServer { + const NAME: &'static str = "cache_grpc.CacheRPC"; } } diff --git a/memcrab-server/src/service.rs b/memcrab-server/src/service.rs index 8b13789..3c3bad3 100644 --- a/memcrab-server/src/service.rs +++ b/memcrab-server/src/service.rs @@ -1 +1,55 @@ +use std::sync::Mutex; +use memcrab_cache::Cache; +use tonic::{Code, Request, Response, Status}; + +use crate::pb::{cache_rpc_server::CacheRpc, GetRequest, GetResponse, SetRequest, SetResponse}; + +#[derive(Debug)] +pub struct Service { + cache: Mutex, + maxbytes: usize, +} + +impl Service { + pub fn new(cache: Cache) -> Self { + let maxbytes = cache.maxbytes(); + Self { + cache: Mutex::new(cache), + maxbytes, + } + } +} + +#[tonic::async_trait] +impl CacheRpc for Service { + async fn get(&self, req: Request) -> Result, Status> { + let req = req.into_inner(); + let key = req.key; + + let mut cache = self.cache.lock().unwrap(); + let option = cache.get(&key).cloned(); + + let reply = GetResponse { value: option }; + Ok(Response::new(reply)) + } + + async fn set(&self, req: Request) -> Result, Status> { + let req = req.into_inner(); + let key = req.key; + let val = req.value; + let sizeof_item = Cache::size_of(&key, &val); + if sizeof_item > self.maxbytes { + let msg = format!( + "Item is too large: {} bytes, expected size <= {} bytes.", + sizeof_item, self.maxbytes + ); + return Err(Status::new(Code::InvalidArgument, msg)); + } + let mut cache = self.cache.lock().unwrap(); + cache.set(key, val); + + let reply = SetResponse {}; + Ok(Response::new(reply)) + } +} diff --git a/memcrab-server/tests/run.rs b/memcrab-server/tests/run.rs new file mode 100644 index 0000000..41cb2ce --- /dev/null +++ b/memcrab-server/tests/run.rs @@ -0,0 +1,25 @@ +use core::num::NonZeroUsize; +use memcrab_cache::Cache; +use memcrab_server::start_grpc_server; +use std::time::Duration; +use tokio::task; + +#[allow(clippy::single_match)] +#[tokio::test] +async fn test_run() { + let addr = "[::1]:50051".parse().unwrap(); + + let maxbytes = 100_000; + let maxlen = NonZeroUsize::new(110).unwrap(); + let cache = Cache::new(maxlen, maxbytes); + + let join = task::spawn(async move { + start_grpc_server(addr, cache).await.unwrap(); + }); + + let done = tokio::time::timeout(Duration::from_secs(15), join).await; + match done { + Ok(_) => panic!("grpc server stopped"), + Err(_) => {} + } +}