Skip to content

Commit

Permalink
add get/set raw client
Browse files Browse the repository at this point in the history
  • Loading branch information
cospectrum committed Mar 3, 2024
1 parent 0b879c1 commit db9ea36
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 31 deletions.
5 changes: 4 additions & 1 deletion memcrab-protocol/src/err.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Msg;
use std::{array::TryFromSliceError, string::FromUtf8Error};

use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -9,6 +9,9 @@ pub enum Error {

#[error("cannot parse message")]
Parse(#[from] ParseError),

#[error("invalid msg")]
InvalidMsg(Msg),
}

#[derive(Error, Debug)]
Expand Down
9 changes: 6 additions & 3 deletions memcrab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ license = "MIT OR Apache-2.0"
readme = "../README.md"
repository = "https://github.com/cospectrum/memcrab"

[dev-dependencies]
anyhow.workspace = true

[dependencies]
async-trait = "0.1.77"
memcrab-protocol = { version = "0.1.0", path = "../memcrab-protocol" }
tokio = { workspace = true, features = ["net"] }

[dev-dependencies]
anyhow.workspace = true
tokio = { workspace = true, features = ["full"] }
12 changes: 5 additions & 7 deletions memcrab/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
### RawClient

```rust
use memcrab::RawClient;
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let addr = "127.0.0.1:80".parse()?;
let client = RawClient::connect(addr).await?;

client.set("age", vec![0, 21]).await?;
client.set("year", "2024".into()).await?;
async fn main() -> Result<(), Error> {
let addr = "127.0.0.1:80".parse().unwrap();
let mut client = RawClient::<Tcp>::connect(addr).await?;

client.set("date", vec![2, 3, 24]).await?;
let name = client.get("name").await?;
match name {
Some(val) => println!("got {:?} from cache", val),
Expand Down
49 changes: 49 additions & 0 deletions memcrab/src/connections.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use async_trait::async_trait;
use tokio::net::{TcpStream, UnixStream};

use crate::{Error, Rpc};
use memcrab_protocol::{Msg, ParseError, Request, Response, Socket};

pub struct Tcp {
inner: Socket<TcpStream>,
}

impl Tcp {
pub(crate) fn from_stream(stream: TcpStream) -> Self {
let inner = Socket::new(stream);
Self { inner }
}
}

#[async_trait]
impl Rpc for Tcp {
async fn call(&mut self, request: Request) -> Result<Response, Error> {
self.inner.send(Msg::Request(request)).await?;
match self.inner.recv().await? {
Msg::Response(resp) => Ok(resp),
_ => Err(Error::Parse(ParseError::UnknownMsgKind)),
}
}
}

pub struct Unix {
inner: Socket<UnixStream>,
}

impl Unix {
pub(crate) fn from_stream(stream: UnixStream) -> Self {
let inner = Socket::new(stream);
Self { inner }
}
}

#[async_trait]
impl Rpc for Unix {
async fn call(&mut self, request: Request) -> Result<Response, Error> {
self.inner.send(Msg::Request(request)).await?;
match self.inner.recv().await? {
Msg::Response(resp) => Ok(resp),
_ => Err(Error::Parse(ParseError::UnknownMsgKind)),
}
}
}
29 changes: 28 additions & 1 deletion memcrab/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
/*!
# memcrab
## Usage
```no_run
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};
#[tokio::main]
async fn main() -> Result<(), Error> {
let addr = "127.0.0.1:80".parse().unwrap();
let mut client = RawClient::<Tcp>::connect(addr).await?;
client.set("date", vec![2, 3, 24]).await?;
let name = client.get("name").await?;
match name {
Some(val) => println!("got {:?} from cache", val),
None => println!("cache miss for name"),
}
Ok(())
}
```
*/

#[allow(unused_variables)]
mod raw_client;

// pub use raw_client::RawClient;
pub mod connections;

pub use memcrab_protocol::Error;
pub use raw_client::{RawClient, Rpc};

#[cfg(test)]
mod tests {}
89 changes: 70 additions & 19 deletions memcrab/src/raw_client.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,70 @@
// use std::net::SocketAddr;
//
// use memcrab_protocol::ClientSideError;
//
// pub struct RawClient {}
//
// impl RawClient {
// pub async fn connect(addr: SocketAddr) -> Result<Self, ClientSideError> {
// todo!()
// }
// pub async fn get(&self, key: impl Into<String>) -> Result<Option<Vec<u8>>, ClientSideError> {
// let key = key.into();
// todo!()
// }
// pub async fn set(&self, key: impl Into<String>, value: Vec<u8>) -> Result<(), ClientSideError> {
// let key = key.into();
// todo!()
// }
// }
use crate::{
connections::{Tcp, Unix},
Error,
};
use memcrab_protocol::{Msg, Request, Response};

use std::{net::SocketAddr, path::Path};
use tokio::net::{TcpStream, UnixStream};

#[async_trait::async_trait]
pub trait Rpc
where
Self: Sized,
{
async fn call(&mut self, request: Request) -> Result<Response, Error>;
}

pub struct RawClient<C> {
conn: C,
}

impl<C> RawClient<C> {
fn new(conn: C) -> Self {
Self { conn }
}
}

fn invalid_resp(resp: Response) -> Error {
Error::InvalidMsg(Msg::Response(resp))
}

impl<C> RawClient<C>
where
C: Rpc,
{
pub async fn get(&mut self, key: impl Into<String>) -> Result<Option<Vec<u8>>, Error> {
let key = key.into();
match self.conn.call(Request::Get(key)).await? {
Response::Value(val) => Ok(Some(val)),
Response::KeyNotFound => Ok(None),
resp => Err(invalid_resp(resp)),
}
}
pub async fn set(&mut self, key: impl Into<String>, value: Vec<u8>) -> Result<(), Error> {
let key = key.into();
let request = Request::Set {
key,
value,
expiration: 0,
};
match self.conn.call(request).await? {
Response::Ok => Ok(()),
resp => Err(invalid_resp(resp)),
}
}
}

impl RawClient<Tcp> {
pub async fn connect(addr: SocketAddr) -> Result<Self, Error> {
let stream = TcpStream::connect(addr).await?;
Ok(Self::new(Tcp::from_stream(stream)))
}
}

impl RawClient<Unix> {
pub async fn connect(path: impl AsRef<Path>) -> Result<Self, Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::new(Unix::from_stream(stream)))
}
}

0 comments on commit db9ea36

Please sign in to comment.