Skip to content

Commit

Permalink
fix lint.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Sep 15, 2019
1 parent d91012a commit 3ca0074
Show file tree
Hide file tree
Showing 28 changed files with 190 additions and 192 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ description = "rsocket-rust is an implementation of the RSocket protocol in Rust
[dependencies]
log = "0.4.8"
bytes = "0.4.12"
futures = "0.1.28"
futures = "0.1.29"
tokio = "0.1.22"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/core/callers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Future for RequestCaller {
type Error = RSocketError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.rx.poll().map_err(|e| RSocketError::from(e))
self.rx.poll().map_err(RSocketError::from)
}
}

Expand Down
40 changes: 16 additions & 24 deletions src/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use std::sync::{Arc, Mutex, RwLock};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;

type AcceptorGenerator = Arc<fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>>;

pub enum Acceptor {
Direct(Box<dyn RSocket>),
Generate(Arc<fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>>),
Generate(AcceptorGenerator),
Empty(),
}

Expand Down Expand Up @@ -67,11 +69,11 @@ impl Runner {
Acceptor::Generate(x) => (Responder::new(), Acceptor::Generate(x)),
};
Runner {
tx: tx,
handlers: handlers,
acceptor: acceptor,
responder: responder,
socket: socket,
tx,
handlers,
acceptor,
responder,
socket,
}
}

Expand Down Expand Up @@ -172,8 +174,8 @@ impl Runner {
}

#[inline]
fn to_future(self, rx: mpsc::Receiver<Frame>) -> impl Future<Item = (), Error = ()> + Send {
let task = rx.for_each(move |f| {
fn gen_task(self, rx: mpsc::Receiver<Frame>) -> impl Future<Item = (), Error = ()> + Send {
rx.for_each(move |f| {
let sid = f.get_stream_id();
let flag = f.get_flag();
debug!("incoming frame#{}", sid);
Expand Down Expand Up @@ -203,7 +205,6 @@ impl Runner {
match handler {
Handler::Request(sender) => {
tx1 = Some(sender);
()
}
Handler::Stream(sender) => {
if flag & frame::FLAG_NEXT != 0 {
Expand All @@ -212,7 +213,6 @@ impl Runner {
if flag & frame::FLAG_COMPLETE == 0 {
senders.insert(sid, Handler::Stream(sender));
}
()
}
};

Expand Down Expand Up @@ -247,9 +247,7 @@ impl Runner {
_ => unimplemented!(),
};
Ok(())
});

task
})
}
}

Expand Down Expand Up @@ -280,11 +278,11 @@ impl DuplexSocket {

let sk = DuplexSocket {
tx: tx.clone(),
handlers: handlers,
handlers,
seq: StreamID::from(first_stream_id),
};

let task = Runner::new(tx, handlers2, responder, sk.clone()).to_future(rx);
let task = Runner::new(tx, handlers2, responder, sk.clone()).gen_task(rx);
let fu = lazy(move || {
tokio::spawn(task0);
task
Expand Down Expand Up @@ -339,10 +337,7 @@ impl RSocket for DuplexSocket {
}
let sending = bu.build();
let tx = self.tx.clone();
let fu = tx
.send(sending)
.map(|_| ())
.map_err(|e| RSocketError::from(e));
let fu = tx.send(sending).map(|_| ()).map_err(RSocketError::from);
Box::new(fu)
}

Expand All @@ -358,10 +353,7 @@ impl RSocket for DuplexSocket {
}
let sending = bu.build();
let tx = self.tx.clone();
let fu = tx
.send(sending)
.map(|_| ())
.map_err(|e| RSocketError::from(e));
let fu = tx.send(sending).map(|_| ()).map_err(RSocketError::from);
Box::new(fu)
}

Expand Down Expand Up @@ -428,7 +420,7 @@ impl DuplexSocketBuilder {
self
}

pub fn from_socket(
pub fn with_socket(
self,
socket: TcpStream,
) -> (DuplexSocket, impl Future<Item = (), Error = ()>) {
Expand Down
2 changes: 1 addition & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl fmt::Display for RSocketError {

impl From<ErrorKind> for RSocketError {
fn from(kind: ErrorKind) -> RSocketError {
RSocketError { kind: kind }
RSocketError { kind }
}
}

Expand Down
17 changes: 14 additions & 3 deletions src/frame/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
use super::{Body, Frame};

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Cancel {}

pub struct CancelBuilder {
stream_id: u32,
flag: u16,
}

impl CancelBuilder {
pub fn build(self) -> Frame {
Frame::new(self.stream_id, Body::Cancel(), self.flag)
}
}

impl Cancel {
pub fn new(stream_id: u32, flag: u16) -> Frame {
Frame::new(stream_id, Body::Cancel(), flag)
pub fn builder(stream_id: u32, flag: u16) -> CancelBuilder {
CancelBuilder { stream_id, flag }
}
}
6 changes: 3 additions & 3 deletions src/frame/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Body, Frame, Writeable};
use crate::result::RSocketResult;
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Error {
code: u32,
data: Option<Bytes>,
Expand Down Expand Up @@ -77,9 +77,9 @@ impl Writeable for Error {
}
}

fn len(&self) -> u32 {
fn len(&self) -> usize {
4 + match &self.data {
Some(v) => v.len() as u32,
Some(v) => v.len(),
None => 0,
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/frame/keepalive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Body, Frame, Writeable};
use crate::result::RSocketResult;
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Keepalive {
last_received_position: u64,
data: Option<Bytes>,
Expand Down Expand Up @@ -69,10 +69,9 @@ impl Keepalive {
&self.data
}

pub fn split(self) -> (Option<Bytes>,Option<Bytes>){
(self.data,None)
pub fn split(self) -> (Option<Bytes>, Option<Bytes>) {
(self.data, None)
}

}

impl Writeable for Keepalive {
Expand All @@ -84,9 +83,9 @@ impl Writeable for Keepalive {
}
}

fn len(&self) -> u32 {
fn len(&self) -> usize {
8 + match &self.data {
Some(v) => v.len() as u32,
Some(v) => v.len(),
None => 0,
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/frame/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Body, Frame, Writeable, FLAG_METADATA};
use crate::result::RSocketResult;
use bytes::{BigEndian, BufMut, ByteOrder, Bytes, BytesMut};

#[derive(Debug, Clone)]
#[derive(Debug, PartialEq)]
pub struct Lease {
ttl: u32,
number_of_requests: u32,
Expand Down Expand Up @@ -47,7 +47,7 @@ impl LeaseBuilder {
}

pub fn build(self) -> Frame {
Frame::new(self.stream_id, Body::Lease(self.value.clone()), self.flag)
Frame::new(self.stream_id, Body::Lease(self.value), self.flag)
}
}

Expand Down Expand Up @@ -96,9 +96,9 @@ impl Writeable for Lease {
}
}

fn len(&self) -> u32 {
fn len(&self) -> usize {
8 + match &self.metadata {
Some(v) => v.len() as u32,
Some(v) => v.len(),
None => 0,
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/frame/metadata_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{Body, Frame, Writeable};
use crate::result::RSocketResult;
use bytes::{BufMut, Bytes, BytesMut};

#[derive(Debug, Clone)]
#[derive(Debug, PartialEq)]
pub struct MetadataPush {
metadata: Option<Bytes>,
}
Expand Down Expand Up @@ -48,10 +48,9 @@ impl MetadataPush {
&self.metadata
}

pub fn split(self) -> (Option<Bytes>,Option<Bytes>){
(None,self.metadata)
pub fn split(self) -> (Option<Bytes>, Option<Bytes>) {
(None, self.metadata)
}

}

impl Writeable for MetadataPush {
Expand All @@ -62,9 +61,9 @@ impl Writeable for MetadataPush {
}
}

fn len(&self) -> u32 {
fn len(&self) -> usize {
match &self.metadata {
Some(v) => v.len() as u32,
Some(v) => v.len(),
None => 0,
}
}
Expand Down
68 changes: 36 additions & 32 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,30 @@ pub const TYPE_METADATA_PUSH: u16 = 0x0C;
pub const TYPE_RESUME: u16 = 0x0D;
pub const TYPE_RESUME_OK: u16 = 0x0E;

pub const ERR_INVALID_SETUP: u32 = 0x00000001;
pub const ERR_UNSUPPORTED_SETUP: u32 = 0x00000002;
pub const ERR_REJECT_SETUP: u32 = 0x00000003;
pub const ERR_REJECT_RESUME: u32 = 0x00000004;
pub const ERR_CONN_FAILED: u32 = 0x00000101;
pub const ERR_CONN_CLOSED: u32 = 0x00000102;
pub const ERR_APPLICATION: u32 = 0x00000201;
pub const ERR_REJECTED: u32 = 0x00000202;
pub const ERR_CANCELED: u32 = 0x00000203;
pub const ERR_INVALID: u32 = 0x00000204;

pub const REQUEST_MAX: u32 = 2147483647;
const LEN_HEADER: u32 = 6;
pub const ERR_INVALID_SETUP: u32 = 0x0000_0001;
pub const ERR_UNSUPPORTED_SETUP: u32 = 0x0000_0002;
pub const ERR_REJECT_SETUP: u32 = 0x0000_0003;
pub const ERR_REJECT_RESUME: u32 = 0x0000_0004;
pub const ERR_CONN_FAILED: u32 = 0x0000_0101;
pub const ERR_CONN_CLOSED: u32 = 0x0000_0102;
pub const ERR_APPLICATION: u32 = 0x0000_0201;
pub const ERR_REJECTED: u32 = 0x0000_0202;
pub const ERR_CANCELED: u32 = 0x0000_0203;
pub const ERR_INVALID: u32 = 0x0000_0204;

pub const REQUEST_MAX: u32 = 0x7FFF_FFFF; // 2147483647

const LEN_HEADER: usize = 6;

pub trait Writeable {
fn write_to(&self, bf: &mut BytesMut);
fn len(&self) -> u32;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum Body {
Setup(Setup),
Lease(Lease),
Expand All @@ -99,7 +103,7 @@ pub enum Body {
ResumeOK(ResumeOK),
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Frame {
stream_id: u32,
body: Body,
Expand Down Expand Up @@ -128,7 +132,7 @@ impl Writeable for Frame {
}
}

fn len(&self) -> u32 {
fn len(&self) -> usize {
// header len
LEN_HEADER
+ match &self.body {
Expand Down Expand Up @@ -167,20 +171,20 @@ impl Frame {
b.advance(2);
let (flag, kind) = (n & 0x03FF, (n & 0xFC00) >> 10);
let body = match kind {
TYPE_SETUP => Setup::decode(flag, b).map(|it| Body::Setup(it)),
TYPE_REQUEST_RESPONSE => RequestResponse::decode(flag, b).map(|it| Body::RequestResponse(it)),
TYPE_REQUEST_STREAM => RequestStream::decode(flag, b).map(|it| Body::RequestStream(it)),
TYPE_REQUEST_CHANNEL => RequestChannel::decode(flag, b).map(|it| Body::RequestChannel(it)),
TYPE_REQUEST_FNF => RequestFNF::decode(flag, b).map(|it| Body::RequestFNF(it)),
TYPE_REQUEST_N => RequestN::decode(flag, b).map(|it| Body::RequestN(it)),
TYPE_METADATA_PUSH => MetadataPush::decode(flag, b).map(|it| Body::MetadataPush(it)),
TYPE_KEEPALIVE => Keepalive::decode(flag, b).map(|it| Body::Keepalive(it)),
TYPE_PAYLOAD => Payload::decode(flag, b).map(|it| Body::Payload(it)),
TYPE_LEASE => Lease::decode(flag, b).map(|it| Body::Lease(it)),
TYPE_SETUP => Setup::decode(flag, b).map(Body::Setup),
TYPE_REQUEST_RESPONSE => RequestResponse::decode(flag, b).map(Body::RequestResponse),
TYPE_REQUEST_STREAM => RequestStream::decode(flag, b).map(Body::RequestStream),
TYPE_REQUEST_CHANNEL => RequestChannel::decode(flag, b).map(Body::RequestChannel),
TYPE_REQUEST_FNF => RequestFNF::decode(flag, b).map(Body::RequestFNF),
TYPE_REQUEST_N => RequestN::decode(flag, b).map(Body::RequestN),
TYPE_METADATA_PUSH => MetadataPush::decode(flag, b).map(Body::MetadataPush),
TYPE_KEEPALIVE => Keepalive::decode(flag, b).map(Body::Keepalive),
TYPE_PAYLOAD => Payload::decode(flag, b).map(Body::Payload),
TYPE_LEASE => Lease::decode(flag, b).map(Body::Lease),
TYPE_CANCEL => Ok(Body::Cancel()),
TYPE_ERROR => Error::decode(flag, b).map(|it| Body::Error(it)),
TYPE_RESUME_OK => ResumeOK::decode(flag, b).map(|it| Body::ResumeOK(it)),
TYPE_RESUME => Resume::decode(flag, b).map(|it| Body::Resume(it)),
TYPE_ERROR => Error::decode(flag, b).map(Body::Error),
TYPE_RESUME_OK => ResumeOK::decode(flag, b).map(Body::ResumeOK),
TYPE_RESUME => Resume::decode(flag, b).map(Body::Resume),
_ => Err(RSocketError::from("illegal frame type")),
};
body.map(|it| Frame::new(sid, it, flag))
Expand Down Expand Up @@ -212,7 +216,7 @@ impl Frame {
}

fn to_frame_type(body: &Body) -> u16 {
return match body {
match body {
Body::Setup(_) => TYPE_SETUP,
Body::Lease(_) => TYPE_LEASE,
Body::Keepalive(_) => TYPE_KEEPALIVE,
Expand All @@ -227,5 +231,5 @@ fn to_frame_type(body: &Body) -> u16 {
Body::MetadataPush(_) => TYPE_METADATA_PUSH,
Body::Resume(_) => TYPE_RESUME,
Body::ResumeOK(_) => TYPE_RESUME_OK,
};
}
}
Loading

0 comments on commit 3ca0074

Please sign in to comment.