Skip to content

Commit

Permalink
Protocol Draft (#17)
Browse files Browse the repository at this point in the history
* fix string size in bytes

* protocol draft
  • Loading branch information
cospectrum authored Jan 16, 2024
1 parent 91758e2 commit dd06ef2
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 101 deletions.
2 changes: 1 addition & 1 deletion memcrab-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Cache {
self.inner.is_empty()
}
pub fn size_of(key: &String, val: &Vec<u8>) -> usize {
key.capacity() + val.capacity()
key.as_bytes().len() + val.capacity()
}
pub fn get(&mut self, key: &str) -> Option<&Vec<u8>> {
self.inner.get(key)
Expand Down
6 changes: 3 additions & 3 deletions memcrab-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub(crate) mod mapping;

pub mod io;

use mapping::alias::Version;

pub use err::{ClientSideError, ParsingError, ServerSideError};
pub use transport::{ClientSocket, ErrorResponse, Request, Response, ServerSocket};

type ProtocolVersion = u16;

pub const PROTOCOL_VERSION: ProtocolVersion = 0;
pub const PROTOCOL_VERSION: Version = 0;
5 changes: 5 additions & 0 deletions memcrab-protocol/src/mapping/alias.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub type Version = u16;
pub type ErrMsgLen = u64;
pub type KeyLen = u64;
pub type ValueLen = u64;
pub type Expiration = u32;
1 change: 1 addition & 0 deletions memcrab-protocol/src/mapping/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) mod alias;
pub(crate) mod flags;
pub(crate) mod tokens;
37 changes: 18 additions & 19 deletions memcrab-protocol/src/mapping/tokens.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::alias::{ErrMsgLen, Expiration, KeyLen, ValueLen, Version};
use std::mem::size_of;

#[derive(Debug)]
Expand All @@ -9,12 +10,6 @@ pub enum Payload {
ErrMsg(String),
}

pub type Version = u16;
pub type ErrMsgLen = u64;
pub type KeyLen = u64;
pub type ValueLen = u64;
pub type Expiration = u32;

#[derive(Debug, Clone, Copy)]
pub enum RequestHeader {
Version(Version),
Expand All @@ -34,6 +29,17 @@ pub enum RequestHeader {
}

impl RequestHeader {
pub const VERSION_SIZE: usize = size_of::<Version>();
pub const KLEN_SIZE: usize = size_of::<KeyLen>();
pub const VLEN_SIZE: usize = size_of::<ValueLen>();
pub const EXP_SIZE: usize = size_of::<Expiration>();

// Max size of the request header.
pub const SIZE: usize = {
let set_size = Self::KLEN_SIZE + Self::VLEN_SIZE + Self::EXP_SIZE;
1 + set_size
};

pub fn payload_len(self) -> usize {
match self {
Self::Get { klen } => klen as usize,
Expand All @@ -46,16 +52,6 @@ impl RequestHeader {
_ => 0,
}
}
pub const VERSION_SIZE: usize = size_of::<Version>();
pub const KLEN_SIZE: usize = size_of::<KeyLen>();
pub const VLEN_SIZE: usize = size_of::<ValueLen>();
pub const EXP_SIZE: usize = size_of::<Expiration>();

// Max size of the request header.
pub const SIZE: usize = {
let set_size = Self::KLEN_SIZE + Self::VLEN_SIZE + Self::EXP_SIZE;
1 + set_size
};
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -68,15 +64,16 @@ pub enum ResponseHeader {
}

impl ResponseHeader {
pub const VLEN_SIZE: usize = size_of::<ValueLen>();
pub const SIZE: usize = { 1 + ErrorHeader::SIZE };

pub fn payload_len(self) -> usize {
match self {
Self::Error(e) => e.errmsg_len() as usize,
Self::Value { vlen } => vlen as usize,
_ => 0,
}
}
pub const VLEN_SIZE: usize = size_of::<ValueLen>();
pub const SIZE: usize = { 1 + ErrorHeader::SIZE };
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -86,13 +83,15 @@ pub enum ErrorHeader {
}

impl ErrorHeader {
pub const MSG_LEN_SIZE: usize = size_of::<ErrMsgLen>();
pub const SIZE: usize = { 1 + size_of::<ErrMsgLen>() };

pub const fn errmsg_len(self) -> ErrMsgLen {
match self {
Self::Validation { len } => len,
Self::Internal { len } => len,
}
}
pub const SIZE: usize = { 1 + size_of::<ErrMsgLen>() };
}

#[cfg(test)]
Expand Down
159 changes: 130 additions & 29 deletions memcrab-protocol/src/transport/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{
io::{AsyncReader, AsyncWriter},
mapping::{
flags::ResponseFlag,
tokens::{ErrorHeader, Payload, ResponseHeader},
alias::{ErrMsgLen, Expiration, KeyLen, ValueLen},
flags::{RequestFlag, ResponseFlag},
tokens::{ErrorHeader, Payload, RequestHeader, ResponseHeader},
},
ClientSideError, ErrorResponse, ParsingError, Request, Response,
};
Expand All @@ -26,58 +27,158 @@ where
let header_chunk = self.stream.read_chunk(ResponseHeader::SIZE).await?;
let header = self.decode_response_header(&header_chunk)?;

let payload_chunk = self.stream.read_chunk(header.payload_len()).await?;
let payload = self.decode_response_payload(header, &payload_chunk)?;
let payload_chunk = if header.payload_len() > 0 {
self.stream.read_chunk(header.payload_len()).await?
} else {
vec![]
};
let payload = self.decode_response_payload(header, payload_chunk)?;

let resp = self.glue(header, payload);
let resp = self.construct_response(header, payload);
Ok(resp)
}
// TODO: maybe return Result to avoid panics
fn glue(&self, header: ResponseHeader, payload: Payload) -> Response {

fn construct_response(&self, header: ResponseHeader, payload: Payload) -> Response {
match header {
ResponseHeader::Ok => Response::Ok,
ResponseHeader::Pong => Response::Pong,
ResponseHeader::KeyNotFound => Response::KeyNotFound,
ResponseHeader::Value { .. } => {
match payload {
Payload::Value(v) => Response::Value(v),
_ => unreachable!(), // TODO
}
}
ResponseHeader::Value { .. } => match payload {
Payload::Zero => Response::Value(vec![]),
Payload::Value(v) => Response::Value(v),
p => panic!("invalid value, payload={:?}", p),
},
ResponseHeader::Error(err) => {
let msg = match payload {
Payload::ErrMsg(msg) => msg,
Payload::Zero => "".to_owned(),
_ => panic!("invalid payload"), // TODO
Payload::ErrMsg(msg) => msg,
p => panic!("invalid error msg, payload={:?}", p),
};
match err {
ErrorHeader::Internal { .. } => Response::Error(ErrorResponse::Internal(msg)),
ErrorHeader::Validation { .. } => {
Response::Error(ErrorResponse::Validation(msg))
}
}
let inner = match err {
ErrorHeader::Internal { .. } => ErrorResponse::Internal(msg),
ErrorHeader::Validation { .. } => ErrorResponse::Validation(msg),
};
Response::Error(inner)
}
}
}
fn encode_request(&self, request: &Request) -> Vec<u8> {
todo!()
let mut bytes = vec![0; RequestHeader::SIZE];
match request {
Request::Ping => {
bytes[0] = RequestFlag::Ping.into();
}
Request::Clear => {
bytes[0] = RequestFlag::Clear.into();
}
Request::Version(v) => {
bytes[0] = RequestFlag::Version.into();
let [a, b] = v.to_be_bytes();
bytes[1] = a;
bytes[2] = b;
}
Request::Get(key) => {
bytes[0] = RequestFlag::Get.into();
let key = key.as_bytes();
let klen: KeyLen = key.len().try_into().unwrap();

for (dst, src) in bytes[1..].iter_mut().zip(klen.to_be_bytes()) {
*dst = src;
}
bytes.extend_from_slice(key);
}
Request::Delete(key) => {
bytes[0] = RequestFlag::Delete.into();
let key = key.as_bytes();
let klen: KeyLen = key.len().try_into().unwrap();

for (dst, src) in bytes[1..].iter_mut().zip(klen.to_be_bytes()) {
*dst = src;
}
bytes.extend_from_slice(key);
}
Request::Set {
key,
value,
expiration,
} => {
bytes[0] = RequestFlag::Set.into();
let key = key.as_bytes();
let klen: KeyLen = key.len().try_into().unwrap();
let vlen: ValueLen = value.len().try_into().unwrap();

let klen_bytes = klen.to_be_bytes();
let vlen_bytes = vlen.to_be_bytes();
let exp_bytes = expiration.to_be_bytes();

let tail = klen_bytes
.iter()
.chain(vlen_bytes.iter())
.chain(exp_bytes.iter());
for (dst, &src) in bytes[1..].iter_mut().zip(tail) {
*dst = src;
}
bytes.extend_from_slice(key);
bytes.extend_from_slice(value);
}
}
bytes
}
fn decode_response_header(&self, h: &[u8]) -> Result<ResponseHeader, ParsingError> {
let flag = ResponseFlag::try_from(h[0]).map_err(|_| ParsingError::Header)?;
fn decode_response_header(&self, header_chunk: &[u8]) -> Result<ResponseHeader, ParsingError> {
let flag = ResponseFlag::try_from(header_chunk[0]).map_err(|_| ParsingError::Header)?;
match flag {
ResponseFlag::Pong => Ok(ResponseHeader::Pong),
ResponseFlag::Ok => Ok(ResponseHeader::Ok),
ResponseFlag::KeyNotFound => Ok(ResponseHeader::KeyNotFound),
ResponseFlag::Value => todo!(),
ResponseFlag::InternalErr => todo!(),
ResponseFlag::ValidationErr => todo!(),
ResponseFlag::Value => {
let vlen_bytes = &header_chunk[1..1 + ResponseHeader::VLEN_SIZE];
let vlen = KeyLen::from_be_bytes(
vlen_bytes
.try_into()
.expect("vlen_bytes.len() should be equal to VLEN_SIZE"),
);
Ok(ResponseHeader::Value { vlen })
}
ResponseFlag::InternalErr => {
let msg_len_bytes = &header_chunk[1..1 + ErrorHeader::MSG_LEN_SIZE];
let msg_len = ErrMsgLen::from_be_bytes(
msg_len_bytes
.try_into()
.expect("msg_len_bytes.len() should be equal to MSG_LEN_SIZE"),
);
let err = ErrorHeader::Internal { len: msg_len };
Ok(ResponseHeader::Error(err))
}
ResponseFlag::ValidationErr => {
let msg_len_bytes = &header_chunk[1..1 + ErrorHeader::MSG_LEN_SIZE];
let msg_len = ErrMsgLen::from_be_bytes(
msg_len_bytes
.try_into()
.expect("msg_len_bytes.len() should be equal to MSG_LEN_SIZE"),
);
let err = ErrorHeader::Validation { len: msg_len };
Ok(ResponseHeader::Error(err))
}
}
}
fn decode_response_payload(
&self,
header: ResponseHeader,
payload_chunk: &[u8],
payload_chunk: Vec<u8>,
) -> Result<Payload, ParsingError> {
todo!()
use ResponseHeader as H;

match header {
H::Pong | H::Ok | H::KeyNotFound => Ok(Payload::Zero),
H::Value { vlen } => {
assert_eq!(vlen, payload_chunk.len() as u64);
Ok(Payload::Value(payload_chunk))
}
H::Error(inner) => {
assert_eq!(inner.errmsg_len(), payload_chunk.len() as u64);
let msg = String::from_utf8(payload_chunk).map_err(|_| ParsingError::Payload)?;
Ok(Payload::ErrMsg(msg))
}
}
}
}
2 changes: 1 addition & 1 deletion memcrab-protocol/src/transport/schemas.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::mapping::tokens::{Expiration, Version};
use crate::mapping::alias::{Expiration, Version};
use thiserror::Error;

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit dd06ef2

Please sign in to comment.