Skip to content

Commit

Permalink
feat(qp): introduce ModifyQueuePairError and PostSendError
Browse files Browse the repository at this point in the history
RDMA C functions basically use return value (when it could be used)
for returning an errno, or when sometimes the function should return
a pointer but return a NULL, user should read errno themselves.

Most of the time, the errno actually suggests an error from kernel,
users really need a lot of experience on RDMA to understand what's
going wrong by just read a simple 8bit error number.

RDMAmojo and rdma-core man pages provides some hints on errnos, while
that need users to search and grab. Rust provides a powerful error
handling system, why not just integrate the hints with errno together,
plus some context to build our error types?

I provide a trial implementation in this commit. Sometimes the complicated
error types introduce performance penalty, so I make it optional.

Signed-off-by: Luke Yue <[email protected]>
  • Loading branch information
dragonJACson committed Oct 23, 2024
1 parent 1b13fdc commit 78d0ac8
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 38 deletions.
13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ os_socketaddr = "0.2"
bitmask-enum = "2.2"
lazy_static = "1.5.0"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.64"

[dev-dependencies]
trybuild = "1.0"
Expand All @@ -33,3 +34,15 @@ quanta = "0.12"
byte-unit = "5.1"
ouroboros = "0.18"
proptest = "1.5"
anyhow = "1.0"

[features]
debug = []

[[example]]
name = "rc_pingpong"
required-features = ["debug"]

[[example]]
name = "rc_pingpong_split"
required-features = ["debug"]
12 changes: 6 additions & 6 deletions examples/rc_pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ struct TimeStamps {
}

#[allow(clippy::while_let_on_iterator)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> anyhow::Result<()> {
let args = Args::parse();
let mut scnt: u32 = 0;
let mut rcnt: u32 = 0;
Expand Down Expand Up @@ -192,7 +192,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.setup_pkey_index(0)
.setup_port(args.ib_port)
.setup_access_flags(AccessFlags::LocalWrite | AccessFlags::RemoteWrite);
qp.modify(&attr).unwrap();
qp.modify(&attr)?;

for _i in 0..rx_depth {
let mut guard = qp.start_post_recv();
Expand Down Expand Up @@ -278,7 +278,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.setup_grh_dest_gid(&remote_context.gid)
.setup_grh_hop_limit(1);
attr.setup_address_vector(&ah_attr);
qp.modify(&attr).unwrap();
qp.modify(&attr)?;

let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::ReadyToSend)
Expand All @@ -288,7 +288,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.setup_rnr_retry(7)
.setup_max_read_atomic(0);

qp.modify(&attr).unwrap();
qp.modify(&attr)?;

let clock = quanta::Clock::new();
let start_time = clock.now();
Expand All @@ -303,7 +303,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
send_handle.setup_sge(send_mr.lkey(), send_mr.buf.data.as_ptr() as _, send_mr.buf.len as _);
}

guard.post().unwrap();
guard.post()?;
outstanding_send = true;
}
// poll for the completion
Expand Down Expand Up @@ -388,7 +388,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
send_mr.buf.len as _,
);
}
guard.post().unwrap();
guard.post()?;
outstanding_send = true;
}
}
Expand Down
21 changes: 10 additions & 11 deletions examples/rc_pingpong_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use sideway::verbs::device_context::{DeviceContext, Mtu};
use sideway::verbs::memory_region::MemoryRegion;
use sideway::verbs::protection_domain::ProtectionDomain;
use sideway::verbs::queue_pair::{
ExtendedQueuePair, PostSendGuard, QueuePair, QueuePairAttribute, QueuePairState, SetScatterGatherEntry,
WorkRequestFlags,
ExtendedQueuePair, PostSendError, PostSendGuard, QueuePair, QueuePairAttribute, QueuePairState,
SetScatterGatherEntry, WorkRequestFlags,
};
use sideway::verbs::AccessFlags;

Expand Down Expand Up @@ -140,7 +140,7 @@ struct PingPongContext {
}

impl PingPongContext {
fn build(device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool) -> Result<PingPongContext, String> {
fn build(device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool) -> anyhow::Result<PingPongContext> {
let context = device
.open()
.unwrap_or_else(|_| panic!("Couldn't get context for {}", device.name().unwrap()));
Expand Down Expand Up @@ -232,7 +232,7 @@ impl PingPongContext {
Ok(())
}

fn post_send(&mut self) -> Result<(), String> {
fn post_send(&mut self) -> Result<(), PostSendError> {
let (mut guard, lkey, ptr, size) = self.with_mut(|fields| {
(
fields.qp.start_post_send(),
Expand All @@ -253,7 +253,7 @@ impl PingPongContext {

fn connect(
&mut self, remote_context: &PingPongDestination, ib_port: u8, psn: u32, mtu: Mtu, sl: u8, gid_idx: u8,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::ReadyToReceive)
.setup_path_mtu(mtu)
Expand Down Expand Up @@ -362,7 +362,7 @@ struct TimeStamps {
}

#[allow(clippy::while_let_on_iterator)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> anyhow::Result<()> {
let args = Args::parse();
let mut scnt: u32 = 0;
let mut rcnt: u32 = 0;
Expand All @@ -386,7 +386,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
None => device_list.iter().next().expect("No IB device found"),
};

let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts).unwrap();
let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts)?;

let gid = ctx.borrow_ctx().query_gid(args.ib_port, args.gid_idx.into()).unwrap();
let psn = rand::random::<u32>() & 0xFFFFFF;
Expand Down Expand Up @@ -448,15 +448,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
remote_context.qp_number, remote_context.packet_seq_number, remote_context.gid
);

ctx.connect(&remote_context, args.ib_port, psn, args.mtu.0, args.sl, args.gid_idx)
.unwrap();
ctx.connect(&remote_context, args.ib_port, psn, args.mtu.0, args.sl, args.gid_idx)?;

let clock = quanta::Clock::new();
let start_time = clock.now();
let mut outstanding_send = false;

if args.server_ip.is_some() {
ctx.post_send().unwrap();
ctx.post_send()?;
outstanding_send = true;
}
// poll for the completion
Expand Down Expand Up @@ -502,7 +501,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}

if need_post_send {
ctx.post_send().unwrap();
ctx.post_send()?;
}

// Check if we're done
Expand Down
Loading

0 comments on commit 78d0ac8

Please sign in to comment.