Skip to content

Commit

Permalink
Add logging and handler for eof on server side (#25)
Browse files Browse the repository at this point in the history
* add logging and handler for eof on server side (connection close probably)

* edit info log
  • Loading branch information
cospectrum authored Mar 3, 2024
1 parent 34f61cd commit b50d535
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 10 deletions.
4 changes: 2 additions & 2 deletions memcrab-protocol/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
let (kind, payload_len) = self.parser.decode_header(&header)?;

let payload = if payload_len > 0 {
read_chunk(&mut self.stream, payload_len as usize).await?
read_chunk_exact(&mut self.stream, payload_len as usize).await?
} else {
vec![]
};
Expand All @@ -51,7 +51,7 @@ where
}
}

async fn read_chunk<S: AsyncRead + Unpin>(
async fn read_chunk_exact<S: AsyncRead + Unpin>(
stream: &mut S,
size: usize,
) -> Result<Vec<u8>, io::Error> {
Expand Down
4 changes: 4 additions & 0 deletions memcrab-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ lru = "0.12.1"
thiserror.workspace = true
typed-builder = "0.18.1"
async-trait = "0.1.77"
tracing = "0.1.40"

[dev-dependencies]
tracing-subscriber = "0.3.18"
13 changes: 13 additions & 0 deletions memcrab-server/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# memcrab-server

## Usage

### Tcp

```rs
use memcrab_server::{serve, Cache};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

let gb = 2_usize.pow(30);
let cache = Cache::builder()
.segments(10)
Expand All @@ -17,3 +23,10 @@ async fn main() {
serve(listener, cache).await.unwrap();
}
```

## Examples

Start TCP server on "127.0.0.1:9900"
```sh
cargo run --example start
```
17 changes: 17 additions & 0 deletions memcrab-server/examples/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use memcrab_server::{serve, Cache};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

let one_gb = 2usize.pow(30);
let cache = Cache::builder()
.segments(10)
.max_bytesize(one_gb)
.build()
.into();

let listener = TcpListener::bind("127.0.0.1:9900").await.unwrap();
serve(listener, cache).await.unwrap();
}
7 changes: 5 additions & 2 deletions memcrab-server/src/serve/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io;
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tracing::info;

#[async_trait::async_trait]
pub trait AcceptConnection {
Expand All @@ -13,7 +14,8 @@ impl AcceptConnection for TcpListener {
type Stream = TcpStream;

async fn accept_connection(&self) -> io::Result<Self::Stream> {
let (socket, _) = self.accept().await?;
let (socket, addr) = self.accept().await?;
info!("accepted tcp connection, addr: {:?}", addr);
Ok(socket)
}
}
Expand All @@ -23,7 +25,8 @@ impl AcceptConnection for UnixListener {
type Stream = UnixStream;

async fn accept_connection(&self) -> io::Result<Self::Stream> {
let (socket, _) = self.accept().await?;
let (socket, addr) = self.accept().await?;
info!("accepted unix connection, addr: {:?}", addr);
Ok(socket)
}
}
21 changes: 18 additions & 3 deletions memcrab-server/src/serve/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use memcrab_protocol::{AsyncRead, AsyncWrite, Request, Response};
use core::panic;
use memcrab_protocol::{AsyncRead, AsyncWrite, Error as ProtocolError, Request, Response};
use std::{io, num::NonZeroU32, sync::Arc};
use tracing::info;

use super::{listener::AcceptConnection, socket::ServerSocket};
use crate::cache::Cache;
use crate::{cache::Cache, serve::err::ServerSideError};

pub(super) async fn start_server<S>(
listener: impl AcceptConnection<Stream = S>,
Expand All @@ -11,6 +13,7 @@ pub(super) async fn start_server<S>(
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
info!("memcrab server started...");
let cache = Arc::new(cache);
loop {
let stream = listener.accept_connection().await?;
Expand All @@ -26,8 +29,20 @@ where
{
let cache = cache.as_ref();
loop {
let request = socket.recv().await.unwrap();
let request = match socket.recv().await {
Ok(req) => req,
Err(ServerSideError::Protocol(ProtocolError::IO(err))) => match err.kind() {
io::ErrorKind::UnexpectedEof => {
info!("eof, close connection");
return;
}
_ => panic!("{:?}", err),
},
err => panic!("{:?}", err),
};
info!("received request: {:?}", &request);
let response = response_to(request, cache);
info!("sending response: {:?}", &response);
socket.send(response).await.unwrap();
}
}
Expand Down
5 changes: 3 additions & 2 deletions memcrab/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

`memcrab` client.

## Examples
## Usage

### RawClient

#### Tcp
```rust
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};
use memcrab::{RawClient, connections::Tcp, Error};

#[tokio::main]
async fn main() -> Result<(), Error> {
Expand Down
8 changes: 7 additions & 1 deletion memcrab/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
/*!
# memcrab
`memcrab` client.
## Usage
### RawClient
#### Tcp
```no_run
use memcrab::{RawClient, connections::Tcp, Rpc as _, Error};
use memcrab::{RawClient, connections::Tcp, Error};
#[tokio::main]
async fn main() -> Result<(), Error> {
Expand Down
1 change: 1 addition & 0 deletions memcrab/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod readme;
15 changes: 15 additions & 0 deletions memcrab/tests/readme.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use memcrab::{connections::Tcp, Error, RawClient};

#[allow(dead_code)]
async fn tcp_raw_client_readme() -> Result<(), Error> {
let addr = "127.0.0.1:80".parse().unwrap();
let mut client = RawClient::<Tcp>::connect(addr).await?;

client.set("date", vec![2, 3, 24]).await?;
let name = client.get("name").await?;
match name {
Some(val) => println!("got {:?} from cache", val),
None => println!("cache miss for name"),
}
Ok(())
}

0 comments on commit b50d535

Please sign in to comment.