diff --git a/Cargo.toml b/Cargo.toml index 080cffe..58fdb0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ os_socketaddr = "0.2" bitmask-enum = "2.2" lazy_static = "1.5.0" serde = { version = "1.0", features = ["derive"] } +thiserror = "1.0.64" [dev-dependencies] trybuild = "1.0" @@ -33,3 +34,15 @@ quanta = "0.12" byte-unit = "5.1" ouroboros = "0.18" proptest = "1.5" +anyhow = "1.0" + +[features] +debug = [] + +[[example]] +name = "rc_pingpong" +required-features = ["debug"] + +[[example]] +name = "rc_pingpong_split" +required-features = ["debug"] diff --git a/examples/rc_pingpong.rs b/examples/rc_pingpong.rs index a2d8207..d44c721 100644 --- a/examples/rc_pingpong.rs +++ b/examples/rc_pingpong.rs @@ -115,7 +115,7 @@ struct TimeStamps { } #[allow(clippy::while_let_on_iterator)] -fn main() -> Result<(), Box> { +fn main() -> anyhow::Result<()> { let args = Args::parse(); let mut scnt: u32 = 0; let mut rcnt: u32 = 0; @@ -192,7 +192,7 @@ fn main() -> Result<(), Box> { .setup_pkey_index(0) .setup_port(args.ib_port) .setup_access_flags(AccessFlags::LocalWrite | AccessFlags::RemoteWrite); - qp.modify(&attr).unwrap(); + qp.modify(&attr)?; for _i in 0..rx_depth { let mut guard = qp.start_post_recv(); @@ -278,7 +278,7 @@ fn main() -> Result<(), Box> { .setup_grh_dest_gid(&remote_context.gid) .setup_grh_hop_limit(1); attr.setup_address_vector(&ah_attr); - qp.modify(&attr).unwrap(); + qp.modify(&attr)?; let mut attr = QueuePairAttribute::new(); attr.setup_state(QueuePairState::ReadyToSend) @@ -288,7 +288,7 @@ fn main() -> Result<(), Box> { .setup_rnr_retry(7) .setup_max_read_atomic(0); - qp.modify(&attr).unwrap(); + qp.modify(&attr)?; let clock = quanta::Clock::new(); let start_time = clock.now(); @@ -303,7 +303,7 @@ fn main() -> Result<(), Box> { send_handle.setup_sge(send_mr.lkey(), send_mr.buf.data.as_ptr() as _, send_mr.buf.len as _); } - guard.post().unwrap(); + guard.post()?; outstanding_send = true; } // poll for the completion @@ -388,7 +388,7 @@ fn main() -> Result<(), Box> { send_mr.buf.len as _, ); } - guard.post().unwrap(); + guard.post()?; outstanding_send = true; } } diff --git a/examples/rc_pingpong_split.rs b/examples/rc_pingpong_split.rs index a3cb3e6..3de06b9 100644 --- a/examples/rc_pingpong_split.rs +++ b/examples/rc_pingpong_split.rs @@ -39,8 +39,8 @@ use sideway::verbs::device_context::{DeviceContext, Mtu}; use sideway::verbs::memory_region::MemoryRegion; use sideway::verbs::protection_domain::ProtectionDomain; use sideway::verbs::queue_pair::{ - ExtendedQueuePair, PostSendGuard, QueuePair, QueuePairAttribute, QueuePairState, SetScatterGatherEntry, - WorkRequestFlags, + ExtendedQueuePair, PostSendError, PostSendGuard, QueuePair, QueuePairAttribute, QueuePairState, + SetScatterGatherEntry, WorkRequestFlags, }; use sideway::verbs::AccessFlags; @@ -140,7 +140,7 @@ struct PingPongContext { } impl PingPongContext { - fn build(device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool) -> Result { + fn build(device: &Device, size: u32, rx_depth: u32, ib_port: u8, use_ts: bool) -> anyhow::Result { let context = device .open() .unwrap_or_else(|_| panic!("Couldn't get context for {}", device.name().unwrap())); @@ -232,7 +232,7 @@ impl PingPongContext { Ok(()) } - fn post_send(&mut self) -> Result<(), String> { + fn post_send(&mut self) -> Result<(), PostSendError> { let (mut guard, lkey, ptr, size) = self.with_mut(|fields| { ( fields.qp.start_post_send(), @@ -253,7 +253,7 @@ impl PingPongContext { fn connect( &mut self, remote_context: &PingPongDestination, ib_port: u8, psn: u32, mtu: Mtu, sl: u8, gid_idx: u8, - ) -> Result<(), String> { + ) -> anyhow::Result<()> { let mut attr = QueuePairAttribute::new(); attr.setup_state(QueuePairState::ReadyToReceive) .setup_path_mtu(mtu) @@ -362,7 +362,7 @@ struct TimeStamps { } #[allow(clippy::while_let_on_iterator)] -fn main() -> Result<(), Box> { +fn main() -> anyhow::Result<()> { let args = Args::parse(); let mut scnt: u32 = 0; let mut rcnt: u32 = 0; @@ -386,7 +386,7 @@ fn main() -> Result<(), Box> { None => device_list.iter().next().expect("No IB device found"), }; - let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts).unwrap(); + let mut ctx = PingPongContext::build(&device, args.size, rx_depth, args.ib_port, args.ts)?; let gid = ctx.borrow_ctx().query_gid(args.ib_port, args.gid_idx.into()).unwrap(); let psn = rand::random::() & 0xFFFFFF; @@ -448,15 +448,14 @@ fn main() -> Result<(), Box> { remote_context.qp_number, remote_context.packet_seq_number, remote_context.gid ); - ctx.connect(&remote_context, args.ib_port, psn, args.mtu.0, args.sl, args.gid_idx) - .unwrap(); + ctx.connect(&remote_context, args.ib_port, psn, args.mtu.0, args.sl, args.gid_idx)?; let clock = quanta::Clock::new(); let start_time = clock.now(); let mut outstanding_send = false; if args.server_ip.is_some() { - ctx.post_send().unwrap(); + ctx.post_send()?; outstanding_send = true; } // poll for the completion @@ -502,7 +501,7 @@ fn main() -> Result<(), Box> { } if need_post_send { - ctx.post_send().unwrap(); + ctx.post_send()?; } // Check if we're done diff --git a/src/verbs/queue_pair.rs b/src/verbs/queue_pair.rs index cb209ad..443ad7f 100644 --- a/src/verbs/queue_pair.rs +++ b/src/verbs/queue_pair.rs @@ -1,5 +1,4 @@ use bitmask_enum::bitmask; -use lazy_static::lazy_static; use rdma_mummy_sys::{ ibv_create_qp, ibv_create_qp_ex, ibv_data_buf, ibv_destroy_qp, ibv_modify_qp, ibv_post_recv, ibv_post_send, ibv_qp, ibv_qp_attr, ibv_qp_attr_mask, ibv_qp_cap, ibv_qp_create_send_ops_flags, ibv_qp_ex, ibv_qp_init_attr, @@ -20,6 +19,62 @@ use super::{ protection_domain::ProtectionDomain, AccessFlags, }; +#[cfg(feature = "debug")] +use crate::verbs::address::Gid; + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ModifyQueuePairError { + #[error("modify queue pair failed")] + GenericError(#[from] io::Error), + #[cfg(feature = "debug")] + #[error("invalid transition from {cur_state:?} to {next_state:?}")] + InvalidTransition { + cur_state: QueuePairState, + next_state: QueuePairState, + source: io::Error, + }, + #[cfg(feature = "debug")] + #[error("invalid transition from {cur_state:?} to {next_state:?}, possible invalid masks {invalid:?}, possible needed masks {needed:?}")] + InvalidAttributeMask { + cur_state: QueuePairState, + next_state: QueuePairState, + invalid: QueuePairAttributeMask, + needed: QueuePairAttributeMask, + source: io::Error, + }, + #[cfg(feature = "debug")] + #[error("resolve route timed out, source gid index: {sgid_index}, destination gid: {gid}")] + ResolveRouteTimedout { + sgid_index: u8, + gid: Gid, + source: io::Error, + }, + #[cfg(feature = "debug")] + #[error("network unreachable, source gid index: {sgid_index}, destination gid: {gid}")] + NetworkUnreachable { + sgid_index: u8, + gid: Gid, + source: io::Error, + }, +} + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum PostSendError { + #[error("post send failed")] + GenericError(#[from] io::Error), + #[cfg(feature = "debug")] + #[error("invalid value provided in work request")] + InvalidWorkRequest(#[source] io::Error), + #[cfg(feature = "debug")] + #[error("invalid value provided in queue pair")] + InvalidQueuePair(#[source] io::Error), + #[cfg(feature = "debug")] + #[error("send queue is full or not enough resources to complete this operation")] + NotEnoughResources(#[source] io::Error), +} + #[repr(u32)] #[derive(Debug, Clone, Copy)] pub enum QueuePairType { @@ -155,7 +210,7 @@ pub trait QueuePair { /// # Returns /// /// * `Ok(())` if the modification was successful. - /// * `Err(String)` if the modification failed, containing an error message. + /// * `Err(ModifyQueuePairError)` if the modification failed, containing an error message. /// /// # Examples /// @@ -164,22 +219,44 @@ pub trait QueuePair { /// attr.setup_state(QueuePairState::ReadyToSend); /// generic_queue_pair.modify(&attr)?; /// ``` - fn modify(&mut self, attr: &QueuePairAttribute) -> Result<(), String> { + fn modify(&mut self, attr: &QueuePairAttribute) -> Result<(), ModifyQueuePairError> { // ibv_qp_attr does not impl Clone trait, so we use struct update syntax here let mut qp_attr = ibv_qp_attr { ..attr.attr }; let ret = unsafe { ibv_modify_qp(self.qp().as_ptr(), &mut qp_attr as *mut _, attr.attr_mask.bits) }; if ret == 0 { Ok(()) } else { - // User doesn't pass in a mask with IBV_QP_STATE, we just assume user doesn't - // want to change the state, pass self.state() as next_state - if attr.attr_mask.contains(QueuePairAttributeMask::State) { - attr_mask_check(attr.attr_mask, self.state(), attr.attr.qp_state.into()).unwrap(); - } else { - attr_mask_check(attr.attr_mask, self.state(), self.state()).unwrap(); + match ret { + #[cfg(feature = "debug")] + libc::EINVAL => { + // User doesn't pass in a mask with IBV_QP_STATE, we just assume user doesn't + // want to change the state, pass self.state() as next_state + let err = if attr.attr_mask.contains(QueuePairAttributeMask::State) { + attr_mask_check(attr.attr_mask, self.state(), attr.attr.qp_state.into()) + } else { + attr_mask_check(attr.attr_mask, self.state(), self.state()) + }; + match err { + Ok(()) => Err(ModifyQueuePairError::GenericError(io::Error::from_raw_os_error( + libc::EINVAL, + ))), + Err(err) => Err(err), + } + }, + #[cfg(feature = "debug")] + libc::ETIMEDOUT => Err(ModifyQueuePairError::ResolveRouteTimedout { + sgid_index: attr.attr.ah_attr.grh.sgid_index, + gid: attr.attr.ah_attr.grh.dgid.into(), + source: io::Error::from_raw_os_error(libc::ETIMEDOUT), + }), + #[cfg(feature = "debug")] + libc::ENETUNREACH => Err(ModifyQueuePairError::NetworkUnreachable { + sgid_index: attr.attr.ah_attr.grh.sgid_index, + gid: attr.attr.ah_attr.grh.dgid.into(), + source: io::Error::from_raw_os_error(libc::ENETUNREACH), + }), + err => Err(ModifyQueuePairError::GenericError(io::Error::from_raw_os_error(err))), } - - Err(format!("ibv_modify_qp failed, err={ret}")) } } @@ -272,7 +349,7 @@ pub trait PostSendGuard: private_traits::PostSendGuard { // every qp should hold only one WorkRequestHandle at the same time fn construct_wr(&mut self, wr_id: u64, wr_flags: WorkRequestFlags) -> WorkRequestHandle<'_, Self>; - fn post(self) -> Result<(), String>; + fn post(self) -> Result<(), PostSendError>; } // According to C standard, enums should be int, but Rust just uses whatever @@ -318,6 +395,7 @@ pub enum QueuePairAttributeMask { // // We should consider using `std::mem::variant_count` here, after it stablized. // +#[cfg(feature = "debug")] #[derive(Debug, Copy, Clone)] struct QueuePairStateTableEntry { // whether this state transition is valid. @@ -326,6 +404,10 @@ struct QueuePairStateTableEntry { optional_mask: QueuePairAttributeMask, } +#[cfg(feature = "debug")] +use lazy_static::lazy_static; + +#[cfg(feature = "debug")] lazy_static! { static ref RC_QP_STATE_TABLE: [[QueuePairStateTableEntry; QueuePairState::Error as usize + 1]; QueuePairState::Error as usize + 1] = { @@ -768,11 +850,13 @@ impl QueuePairAttribute { // TODO(zhp): trait for QueuePair +#[cfg(feature = "debug")] #[inline] fn get_needed_mask(cur_mask: QueuePairAttributeMask, required_mask: QueuePairAttributeMask) -> QueuePairAttributeMask { required_mask.and(required_mask.xor(cur_mask)) } +#[cfg(feature = "debug")] #[inline] fn get_invalid_mask( cur_mask: QueuePairAttributeMask, required_mask: QueuePairAttributeMask, optional_mask: QueuePairAttributeMask, @@ -780,11 +864,16 @@ fn get_invalid_mask( cur_mask.and(required_mask.or(optional_mask).not()) } +#[cfg(feature = "debug")] fn attr_mask_check( attr_mask: QueuePairAttributeMask, cur_state: QueuePairState, next_state: QueuePairState, -) -> Result<(), String> { +) -> Result<(), ModifyQueuePairError> { if !RC_QP_STATE_TABLE[cur_state as usize][next_state as usize].valid { - return Err(format!("Invalid transition from {cur_state:?} to {next_state:?}")); + return Err(ModifyQueuePairError::InvalidTransition { + cur_state, + next_state, + source: io::Error::from_raw_os_error(libc::EINVAL), + }); } let required = RC_QP_STATE_TABLE[cur_state as usize][next_state as usize].required_mask; @@ -794,7 +883,13 @@ fn attr_mask_check( if invalid.bits == 0 && needed.bits == 0 { Ok(()) } else { - Err(format!("Invalid transition from {cur_state:?} to {next_state:?}, possible invalid masks {invalid:?}, possible needed masks {needed:?}")) + Err(ModifyQueuePairError::InvalidAttributeMask { + cur_state, + next_state, + invalid, + needed, + source: io::Error::from_raw_os_error(libc::EINVAL), + }) } } @@ -881,7 +976,7 @@ impl PostSendGuard for BasicPostSendGuard<'_> { WorkRequestHandle { guard: self } } - fn post(mut self) -> Result<(), String> { + fn post(mut self) -> Result<(), PostSendError> { let mut sge_index = 0; for i in 0..self.wrs.len() { @@ -903,7 +998,19 @@ impl PostSendGuard for BasicPostSendGuard<'_> { let ret = unsafe { ibv_post_send(self.qp.as_ptr(), self.wrs.as_mut_ptr(), &mut bad_wr) }; match ret { 0 => Ok(()), - err => Err(format!("ibv_post_send failed, ret={err}")), + #[cfg(feature = "debug")] + libc::EINVAL => Err(PostSendError::InvalidWorkRequest(io::Error::from_raw_os_error( + libc::EINVAL, + ))), + #[cfg(feature = "debug")] + libc::ENOMEM => Err(PostSendError::NotEnoughResources(io::Error::from_raw_os_error( + libc::ENOMEM, + ))), + #[cfg(feature = "debug")] + libc::EFAULT => Err(PostSendError::InvalidQueuePair(io::Error::from_raw_os_error( + libc::EFAULT, + ))), + err => Err(PostSendError::GenericError(io::Error::from_raw_os_error(err))), } } } @@ -987,14 +1094,26 @@ impl PostSendGuard for ExtendedPostSendGuard<'_> { WorkRequestHandle { guard: self } } - fn post(mut self) -> Result<(), String> { + fn post(mut self) -> Result<(), PostSendError> { let ret: i32 = unsafe { ibv_wr_complete(self.qp_ex.unwrap_unchecked().as_ptr()) }; self.qp_ex = None; match ret { 0 => Ok(()), - err => Err(format!("failed to ibv_wr_complete: ret {err}")), + #[cfg(feature = "debug")] + libc::EINVAL => Err(PostSendError::InvalidWorkRequest(io::Error::from_raw_os_error( + libc::EINVAL, + ))), + #[cfg(feature = "debug")] + libc::ENOMEM => Err(PostSendError::NotEnoughResources(io::Error::from_raw_os_error( + libc::ENOMEM, + ))), + #[cfg(feature = "debug")] + libc::EFAULT => Err(PostSendError::InvalidQueuePair(io::Error::from_raw_os_error( + libc::EFAULT, + ))), + err => Err(PostSendError::GenericError(io::Error::from_raw_os_error(err))), } } } @@ -1139,7 +1258,7 @@ impl QueuePair for GenericQueuePair<'_> { } } - fn modify(&mut self, attr: &QueuePairAttribute) -> Result<(), String> { + fn modify(&mut self, attr: &QueuePairAttribute) -> Result<(), ModifyQueuePairError> { match self { GenericQueuePair::Basic(qp) => qp.modify(attr), GenericQueuePair::Extended(qp) => qp.modify(attr), @@ -1182,7 +1301,7 @@ impl<'g> PostSendGuard for GenericPostSendGuard<'g> { } } - fn post(self) -> Result<(), String> { + fn post(self) -> Result<(), PostSendError> { match self { GenericPostSendGuard::Basic(guard) => guard.post(), GenericPostSendGuard::Extended(guard) => guard.post(),