Skip to content

Commit

Permalink
Merge pull request #4 from rsocket/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
jjeffcaii authored Dec 24, 2019
2 parents 777d554 + 5c79d97 commit ecae88d
Show file tree
Hide file tree
Showing 43 changed files with 2,077 additions and 1,793 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rsocket_rust"
version = "0.3.0"
version = "0.4.0"
authors = ["Jeffsky <[email protected]>"]
edition = "2018"
license = "Apache-2.0"
Expand All @@ -12,13 +12,14 @@ description = "rsocket-rust is an implementation of the RSocket protocol in Rust
[dependencies]
matches = "0.1.8"
log = "0.4.8"
bytes = "0.5.2"
bytes = "0.5.3"
futures = "0.3.1"
lazy_static = "1.4.0"
url = "2.1.0"
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}

[dependencies.tokio]
version = "0.2.2"
version = "0.2.6"
default-features = false
features = ["full"]

Expand All @@ -36,6 +37,6 @@ rand = "0.7.2"
name = "echo"
path = "examples/echo/main.rs"

# [[example]]
# name = "proxy"
# path = "examples/proxy/main.rs"
[[example]]
name = "proxy"
path = "examples/proxy/main.rs"
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[![GitHub Release](https://img.shields.io/github/release-pre/rsocket/rsocket-rust.svg)](https://github.com/rsocket/rsocket-rust/releases)

> rsocket-rust is an implementation of the RSocket protocol in Rust(1.39+).
It's an **alpha** version and still under active development. **Do not use it in a production environment!**
It's an **alpha** version and still under active development.
**Do not use it in a production environment!**

## Example

Expand All @@ -26,13 +27,15 @@ use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::builder().init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
let addr = env::args().nth(1).unwrap_or("tcp://127.0.0.1:7878".to_string());

RSocketFactory::receive()
.transport(URI::Tcp(addr))
.transport(&addr)
.acceptor(|setup, _socket| {
info!("accept setup: {:?}", setup);
Box::new(EchoRSocket)
Ok(Box::new(EchoRSocket))
// Or you can reject setup
// Err(From::from("SETUP_NOT_ALLOW"))
})
.serve()
.await
Expand All @@ -51,7 +54,7 @@ use rsocket_rust::prelude::*;
async fn test() {
let cli = RSocketFactory::connect()
.acceptor(|| Box::new(EchoRSocket))
.transport(URI::Tcp("127.0.0.1:7878".to_string()))
.transport("tcp://127.0.0.1:7878")
.setup(Payload::from("READY!"))
.mime_type("text/plain", "text/plain")
.start()
Expand Down Expand Up @@ -81,6 +84,14 @@ async fn test() {
- [x] REQUEST_RESPONSE
- [x] REQUEST_STREAM
- [x] REQUEST_CHANNEL
- More Operations
- [ ] Error
- [ ] Cancel
- [ ] Fragmentation
- [ ] Resume
- QoS
- [ ] RequestN
- [ ] Lease
- Transport
- [x] TCP
- [ ] Websocket
Expand Down
16 changes: 10 additions & 6 deletions examples/echo/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ use std::env;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::builder().init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());

async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
env_logger::builder().format_timestamp_millis().init();
let addr = env::args()
.nth(1)
.unwrap_or("tcp://127.0.0.1:7878".to_string());
RSocketFactory::receive()
.transport(URI::Tcp(addr))
.transport(&addr)
.acceptor(|setup, _socket| {
info!("accept setup: {:?}", setup);
Box::new(EchoRSocket)
Ok(Box::new(EchoRSocket))
// Or you can reject setup
// Err(From::from("SETUP_NOT_ALLOW"))
})
.on_start(|| info!("+++++++ echo server started! +++++++"))
.serve()
.await
}
32 changes: 32 additions & 0 deletions examples/proxy/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate futures;
extern crate rsocket_rust;
extern crate tokio;

use futures::executor::block_on;
use rsocket_rust::prelude::*;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
env_logger::builder().format_timestamp_millis().init();

RSocketFactory::receive()
.acceptor(|setup, _sending_socket| {
info!("incoming socket: setup={:?}", setup);
Ok(Box::new(block_on(async move {
RSocketFactory::connect()
.acceptor(|| Box::new(EchoRSocket))
.setup(Payload::from("I'm Rust!"))
.transport("tcp://127.0.0.1:7878")
.start()
.await
.unwrap()
})))
})
.transport("tcp://127.0.0.1:7979")
.serve()
.await
}
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
edition = "2018"
86 changes: 53 additions & 33 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,77 @@ use std::error::Error;
use std::fmt;
use std::io;

pub const ERR_INVALID_SETUP: u32 = 0x0000_0001;
pub const ERR_UNSUPPORTED_SETUP: u32 = 0x0000_0002;
pub const ERR_REJECT_SETUP: u32 = 0x0000_0003;
pub const ERR_REJECT_RESUME: u32 = 0x0000_0004;
pub const ERR_CONN_FAILED: u32 = 0x0000_0101;
pub const ERR_CONN_CLOSED: u32 = 0x0000_0102;
pub const ERR_APPLICATION: u32 = 0x0000_0201;
pub const ERR_REJECTED: u32 = 0x0000_0202;
pub const ERR_CANCELED: u32 = 0x0000_0203;
pub const ERR_INVALID: u32 = 0x0000_0204;

#[derive(Debug)]
pub enum ErrorKind {
Internal(u32, &'static str),
WithDescription(String),
IO(io::Error),
Cancelled(),
Send(),
Internal(u32, String),
WithDescription(String),
IO(io::Error),
Cancelled(),
}

#[derive(Debug)]
pub struct RSocketError {
kind: ErrorKind,
kind: ErrorKind,
}

impl Error for RSocketError {
fn description(&self) -> &str {
"this is a rsocket error"
}

fn cause(&self) -> Option<&dyn Error> {
match &self.kind {
ErrorKind::IO(e) => Some(e),
_ => None,
}
}
}
impl Error for RSocketError {}

impl fmt::Display for RSocketError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
println!(">>>>>>>>>>> {:?}", self.kind);
unimplemented!()
}
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self.kind {
ErrorKind::Internal(c, s) => write!(f, "ERROR({}): {}", translate(c), s),
ErrorKind::WithDescription(s) => write!(f, "{}", s),
ErrorKind::IO(e) => write!(f, "{}", e),
ErrorKind::Cancelled() => write!(f, "ERROR(CANCELLED)"),
}
}
}

impl From<ErrorKind> for RSocketError {
fn from(kind: ErrorKind) -> RSocketError {
RSocketError { kind }
}
fn from(kind: ErrorKind) -> RSocketError {
RSocketError { kind }
}
}
impl From<String> for RSocketError {
fn from(e: String) -> RSocketError {
RSocketError {
kind: ErrorKind::WithDescription(e),
fn from(e: String) -> RSocketError {
RSocketError {
kind: ErrorKind::WithDescription(e),
}
}
}
}

impl From<&'static str> for RSocketError {
fn from(e: &'static str) -> RSocketError {
RSocketError {
kind: ErrorKind::WithDescription(String::from(e)),
fn from(e: &'static str) -> RSocketError {
RSocketError {
kind: ErrorKind::WithDescription(String::from(e)),
}
}
}

#[inline]
fn translate(code: &u32) -> &str {
match *code {
ERR_APPLICATION => "APPLICATION",
ERR_INVALID_SETUP => "INVALID_SETUP",
ERR_UNSUPPORTED_SETUP => "UNSUPPORTED_SETUP",
ERR_REJECT_SETUP => "REJECT_SETUP",
ERR_REJECT_RESUME => "REJECT_RESUME",
ERR_CONN_FAILED => "CONN_FAILED",
ERR_CONN_CLOSED => "CONN_CLOSED",
ERR_REJECTED => "REJECTED",
ERR_CANCELED => "CANCELED",
ERR_INVALID => "INVALID",
_ => "UNKNOWN",
}
}
}
1 change: 0 additions & 1 deletion src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ impl RoutingMetadataBuilder {
pub fn push_str(self, tag: &str) -> Self {
self.push(String::from(tag))
}

pub fn push(mut self, tag: String) -> Self {
if tag.len() > MAX_ROUTING_TAG_LEN {
panic!("exceeded maximum routing tag length!");
Expand Down
16 changes: 8 additions & 8 deletions src/frame/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ use super::{Body, Frame};
pub struct Cancel {}

pub struct CancelBuilder {
stream_id: u32,
flag: u16,
stream_id: u32,
flag: u16,
}

impl CancelBuilder {
pub fn build(self) -> Frame {
Frame::new(self.stream_id, Body::Cancel(), self.flag)
}
pub fn build(self) -> Frame {
Frame::new(self.stream_id, Body::Cancel(), self.flag)
}
}

impl Cancel {
pub fn builder(stream_id: u32, flag: u16) -> CancelBuilder {
CancelBuilder { stream_id, flag }
}
pub fn builder(stream_id: u32, flag: u16) -> CancelBuilder {
CancelBuilder { stream_id, flag }
}
}
Loading

0 comments on commit ecae88d

Please sign in to comment.