Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging and handler for eof on server side #25

Merged
merged 2 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions memcrab-protocol/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
let (kind, payload_len) = self.parser.decode_header(&header)?;

let payload = if payload_len > 0 {
read_chunk(&mut self.stream, payload_len as usize).await?
read_chunk_exact(&mut self.stream, payload_len as usize).await?
} else {
vec![]
};
Expand All @@ -51,7 +51,7 @@ where
}
}

async fn read_chunk<S: AsyncRead + Unpin>(
async fn read_chunk_exact<S: AsyncRead + Unpin>(
stream: &mut S,
size: usize,
) -> Result<Vec<u8>, io::Error> {
Expand Down
4 changes: 4 additions & 0 deletions memcrab-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ lru = "0.12.1"
thiserror.workspace = true
typed-builder = "0.18.1"
async-trait = "0.1.77"
tracing = "0.1.40"

[dev-dependencies]
tracing-subscriber = "0.3.18"
13 changes: 13 additions & 0 deletions memcrab-server/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# memcrab-server

## Usage

### Tcp

```rs
use memcrab_server::{serve, Cache};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

let gb = 2_usize.pow(30);
let cache = Cache::builder()
.segments(10)
Expand All @@ -17,3 +23,10 @@ async fn main() {
serve(listener, cache).await.unwrap();
}
```

## Examples

Start TCP server on "127.0.0.1:9900"
```sh
cargo run --example start
```
17 changes: 17 additions & 0 deletions memcrab-server/examples/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use memcrab_server::{serve, Cache};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

let one_gb = 2usize.pow(30);
let cache = Cache::builder()
.segments(10)
.max_bytesize(one_gb)
.build()
.into();

let listener = TcpListener::bind("127.0.0.1:9900").await.unwrap();
serve(listener, cache).await.unwrap();
}
7 changes: 5 additions & 2 deletions memcrab-server/src/serve/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io;
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tracing::info;

#[async_trait::async_trait]
pub trait AcceptConnection {
Expand All @@ -13,7 +14,8 @@ impl AcceptConnection for TcpListener {
type Stream = TcpStream;

async fn accept_connection(&self) -> io::Result<Self::Stream> {
let (socket, _) = self.accept().await?;
let (socket, addr) = self.accept().await?;
info!("accepted tcp connection, addr: {:?}", addr);
Ok(socket)
}
}
Expand All @@ -23,7 +25,8 @@ impl AcceptConnection for UnixListener {
type Stream = UnixStream;

async fn accept_connection(&self) -> io::Result<Self::Stream> {
let (socket, _) = self.accept().await?;
let (socket, addr) = self.accept().await?;
info!("accepted unix connection, addr: {:?}", addr);
Ok(socket)
}
}
21 changes: 18 additions & 3 deletions memcrab-server/src/serve/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use memcrab_protocol::{AsyncRead, AsyncWrite, Request, Response};
use core::panic;
use memcrab_protocol::{AsyncRead, AsyncWrite, Error as ProtocolError, Request, Response};
use std::{io, num::NonZeroU32, sync::Arc};
use tracing::info;

use super::{listener::AcceptConnection, socket::ServerSocket};
use crate::cache::Cache;
use crate::{cache::Cache, serve::err::ServerSideError};

pub(super) async fn start_server<S>(
listener: impl AcceptConnection<Stream = S>,
Expand All @@ -11,6 +13,7 @@ pub(super) async fn start_server<S>(
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
info!("memcrab server started...");
let cache = Arc::new(cache);
loop {
let stream = listener.accept_connection().await?;
Expand All @@ -26,8 +29,20 @@ where
{
let cache = cache.as_ref();
loop {
let request = socket.recv().await.unwrap();
let request = match socket.recv().await {
Ok(req) => req,
Err(ServerSideError::Protocol(ProtocolError::IO(err))) => match err.kind() {
io::ErrorKind::UnexpectedEof => {
info!("eof, close connection");
return;
}
_ => panic!("{:?}", err),
},
err => panic!("{:?}", err),
};
info!("received request: {:?}", &request);
let response = response_to(request, cache);
info!("sending response: {:?}", &response);
socket.send(response).await.unwrap();
}
}
Expand Down
5 changes: 3 additions & 2 deletions memcrab/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

`memcrab` client.

## Examples
## Usage

### RawClient

#### Tcp
```rust
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};
use memcrab::{RawClient, connections::Tcp, Error};

#[tokio::main]
async fn main() -> Result<(), Error> {
Expand Down
8 changes: 7 additions & 1 deletion memcrab/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
/*!
# memcrab

`memcrab` client.

## Usage

### RawClient

#### Tcp

```no_run
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};
use memcrab::{RawClient, connections::Tcp, Error};

#[tokio::main]
async fn main() -> Result<(), Error> {
Expand Down
1 change: 1 addition & 0 deletions memcrab/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod readme;
15 changes: 15 additions & 0 deletions memcrab/tests/readme.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use memcrab::{connections::Tcp, Error, RawClient};

#[allow(dead_code)]
async fn tcp_raw_client_readme() -> Result<(), Error> {
let addr = "127.0.0.1:80".parse().unwrap();
let mut client = RawClient::<Tcp>::connect(addr).await?;

client.set("date", vec![2, 3, 24]).await?;
let name = client.get("name").await?;
match name {
Some(val) => println!("got {:?} from cache", val),
None => println!("cache miss for name"),
}
Ok(())
}
Loading