Skip to content

Commit

Permalink
feat(cq): add work completion status / opcode / flags
Browse files Browse the repository at this point in the history
Introduce IBV_WC_STANDARD_FLAGS from verbs.h as default flag for creating
extended CQ.

Signed-off-by: Luke Yue <[email protected]>
  • Loading branch information
dragonJACson committed Oct 12, 2024
1 parent 6b5ddde commit 5e6e783
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ Cargo.lock
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Tests
proptest-regressions/
wip/
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,11 @@ serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
trybuild = "1.0"
rstest = "0.22"
rstest = "0.23"
clap = { version = "4.5", features = ["derive"] }
rand = "0.8"
postcard = { version = "1.0", features = ["alloc"] }
quanta = "0.12"
byte-unit = "5.1"
ouroboros = "0.18"
proptest = "1.5"
246 changes: 241 additions & 5 deletions src/verbs/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,155 @@ use std::ptr;
use std::ptr::NonNull;
use std::{marker::PhantomData, mem::MaybeUninit};

use bitmask_enum::bitmask;

use super::device_context::DeviceContext;
use rdma_mummy_sys::{
ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex,
ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq, ibv_poll_cq_attr,
ibv_start_poll, ibv_wc, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_opcode,
ibv_wc_read_vendor_err,
ibv_create_cq_wc_flags, ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq,
ibv_poll_cq_attr, ibv_start_poll, ibv_wc, ibv_wc_opcode, ibv_wc_read_byte_len, ibv_wc_read_completion_ts,
ibv_wc_read_opcode, ibv_wc_read_vendor_err, ibv_wc_status,
};

#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkCompletionStatus {
Success = ibv_wc_status::IBV_WC_SUCCESS,
LocalLengthError = ibv_wc_status::IBV_WC_LOC_LEN_ERR,
LocalQueuePairOperationError = ibv_wc_status::IBV_WC_LOC_QP_OP_ERR,
LocalEndToEndContextOperationError = ibv_wc_status::IBV_WC_LOC_EEC_OP_ERR,
LocalProtectionError = ibv_wc_status::IBV_WC_LOC_PROT_ERR,
WorkRequestFlushedError = ibv_wc_status::IBV_WC_WR_FLUSH_ERR,
MemoryWindowBindError = ibv_wc_status::IBV_WC_MW_BIND_ERR,
BadResponseError = ibv_wc_status::IBV_WC_BAD_RESP_ERR,
LocalAccessError = ibv_wc_status::IBV_WC_LOC_ACCESS_ERR,
RemoteInvalidRequestError = ibv_wc_status::IBV_WC_REM_INV_REQ_ERR,
RemoteAccessError = ibv_wc_status::IBV_WC_REM_ACCESS_ERR,
RemoteOperationError = ibv_wc_status::IBV_WC_REM_OP_ERR,
RetryCounterExceededError = ibv_wc_status::IBV_WC_RETRY_EXC_ERR,
ResponderNotReadyRetryCounterExceededError = ibv_wc_status::IBV_WC_RNR_RETRY_EXC_ERR,
LocalReliableDatagramDomainViolationError = ibv_wc_status::IBV_WC_LOC_RDD_VIOL_ERR,
RemoteInvalidReliableDatagramRequest = ibv_wc_status::IBV_WC_REM_INV_RD_REQ_ERR,
RemoteAbortedError = ibv_wc_status::IBV_WC_REM_ABORT_ERR,
InvalidEndToEndContextNumberError = ibv_wc_status::IBV_WC_INV_EECN_ERR,
InvalidEndToEndContextStateError = ibv_wc_status::IBV_WC_INV_EEC_STATE_ERR,
FatalError = ibv_wc_status::IBV_WC_FATAL_ERR,
ResponseTimeoutError = ibv_wc_status::IBV_WC_RESP_TIMEOUT_ERR,
GeneralError = ibv_wc_status::IBV_WC_GENERAL_ERR,
TagMatchingError = ibv_wc_status::IBV_WC_TM_ERR,
TagMatchingRendezvousIncomplete = ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE,
}

impl From<u32> for WorkCompletionStatus {
fn from(status: u32) -> Self {
match status {
ibv_wc_status::IBV_WC_SUCCESS => WorkCompletionStatus::Success,
ibv_wc_status::IBV_WC_LOC_LEN_ERR => WorkCompletionStatus::LocalLengthError,
ibv_wc_status::IBV_WC_LOC_QP_OP_ERR => WorkCompletionStatus::LocalQueuePairOperationError,
ibv_wc_status::IBV_WC_LOC_EEC_OP_ERR => WorkCompletionStatus::LocalEndToEndContextOperationError,
ibv_wc_status::IBV_WC_LOC_PROT_ERR => WorkCompletionStatus::LocalProtectionError,
ibv_wc_status::IBV_WC_WR_FLUSH_ERR => WorkCompletionStatus::WorkRequestFlushedError,
ibv_wc_status::IBV_WC_MW_BIND_ERR => WorkCompletionStatus::MemoryWindowBindError,
ibv_wc_status::IBV_WC_BAD_RESP_ERR => WorkCompletionStatus::BadResponseError,
ibv_wc_status::IBV_WC_LOC_ACCESS_ERR => WorkCompletionStatus::LocalAccessError,
ibv_wc_status::IBV_WC_REM_INV_REQ_ERR => WorkCompletionStatus::RemoteInvalidRequestError,
ibv_wc_status::IBV_WC_REM_ACCESS_ERR => WorkCompletionStatus::RemoteAccessError,
ibv_wc_status::IBV_WC_REM_OP_ERR => WorkCompletionStatus::RemoteOperationError,
ibv_wc_status::IBV_WC_RETRY_EXC_ERR => WorkCompletionStatus::RetryCounterExceededError,
ibv_wc_status::IBV_WC_RNR_RETRY_EXC_ERR => WorkCompletionStatus::ResponderNotReadyRetryCounterExceededError,
ibv_wc_status::IBV_WC_LOC_RDD_VIOL_ERR => WorkCompletionStatus::LocalReliableDatagramDomainViolationError,
ibv_wc_status::IBV_WC_REM_INV_RD_REQ_ERR => WorkCompletionStatus::RemoteInvalidReliableDatagramRequest,
ibv_wc_status::IBV_WC_REM_ABORT_ERR => WorkCompletionStatus::RemoteAbortedError,
ibv_wc_status::IBV_WC_INV_EECN_ERR => WorkCompletionStatus::InvalidEndToEndContextNumberError,
ibv_wc_status::IBV_WC_INV_EEC_STATE_ERR => WorkCompletionStatus::InvalidEndToEndContextStateError,
ibv_wc_status::IBV_WC_FATAL_ERR => WorkCompletionStatus::FatalError,
ibv_wc_status::IBV_WC_RESP_TIMEOUT_ERR => WorkCompletionStatus::ResponseTimeoutError,
ibv_wc_status::IBV_WC_GENERAL_ERR => WorkCompletionStatus::GeneralError,
ibv_wc_status::IBV_WC_TM_ERR => WorkCompletionStatus::TagMatchingError,
ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE => WorkCompletionStatus::TagMatchingRendezvousIncomplete,
_ => panic!("Unknown work completion status: {status}"),
}
}
}

#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkCompletionOperationType {
Send = ibv_wc_opcode::IBV_WC_SEND,
Write = ibv_wc_opcode::IBV_WC_RDMA_WRITE,
Read = ibv_wc_opcode::IBV_WC_RDMA_READ,
CompareAndSwap = ibv_wc_opcode::IBV_WC_COMP_SWAP,
FetchAndAdd = ibv_wc_opcode::IBV_WC_FETCH_ADD,
BindMemoryWindow = ibv_wc_opcode::IBV_WC_BIND_MW,
LocalInvalidate = ibv_wc_opcode::IBV_WC_LOCAL_INV,
TcpSegmentationOffload = ibv_wc_opcode::IBV_WC_TSO,
Flush = ibv_wc_opcode::IBV_WC_FLUSH,
AtomicWrite = ibv_wc_opcode::IBV_WC_ATOMIC_WRITE,
Receive = ibv_wc_opcode::IBV_WC_RECV,
ReceiveWithImmediate = ibv_wc_opcode::IBV_WC_RECV_RDMA_WITH_IMM,
TagMatchingAdd = ibv_wc_opcode::IBV_WC_TM_ADD,
TagMatchingDelete = ibv_wc_opcode::IBV_WC_TM_DEL,
TagMatchingSync = ibv_wc_opcode::IBV_WC_TM_SYNC,
TagMatchingReceive = ibv_wc_opcode::IBV_WC_TM_RECV,
TagMatchingNoTag = ibv_wc_opcode::IBV_WC_TM_NO_TAG,
Driver1 = ibv_wc_opcode::IBV_WC_DRIVER1,
Driver2 = ibv_wc_opcode::IBV_WC_DRIVER2,
Driver3 = ibv_wc_opcode::IBV_WC_DRIVER3,
}

impl From<u32> for WorkCompletionOperationType {
fn from(opcode: u32) -> Self {
match opcode {
ibv_wc_opcode::IBV_WC_SEND => WorkCompletionOperationType::Send,
ibv_wc_opcode::IBV_WC_RDMA_WRITE => WorkCompletionOperationType::Write,
ibv_wc_opcode::IBV_WC_RDMA_READ => WorkCompletionOperationType::Read,
ibv_wc_opcode::IBV_WC_COMP_SWAP => WorkCompletionOperationType::CompareAndSwap,
ibv_wc_opcode::IBV_WC_FETCH_ADD => WorkCompletionOperationType::FetchAndAdd,
ibv_wc_opcode::IBV_WC_BIND_MW => WorkCompletionOperationType::BindMemoryWindow,
ibv_wc_opcode::IBV_WC_LOCAL_INV => WorkCompletionOperationType::LocalInvalidate,
ibv_wc_opcode::IBV_WC_TSO => WorkCompletionOperationType::TcpSegmentationOffload,
ibv_wc_opcode::IBV_WC_FLUSH => WorkCompletionOperationType::Flush,
ibv_wc_opcode::IBV_WC_ATOMIC_WRITE => WorkCompletionOperationType::AtomicWrite,
ibv_wc_opcode::IBV_WC_RECV => WorkCompletionOperationType::Receive,
ibv_wc_opcode::IBV_WC_RECV_RDMA_WITH_IMM => WorkCompletionOperationType::ReceiveWithImmediate,
ibv_wc_opcode::IBV_WC_TM_ADD => WorkCompletionOperationType::TagMatchingAdd,
ibv_wc_opcode::IBV_WC_TM_DEL => WorkCompletionOperationType::TagMatchingDelete,
ibv_wc_opcode::IBV_WC_TM_SYNC => WorkCompletionOperationType::TagMatchingSync,
ibv_wc_opcode::IBV_WC_TM_RECV => WorkCompletionOperationType::TagMatchingReceive,
ibv_wc_opcode::IBV_WC_TM_NO_TAG => WorkCompletionOperationType::TagMatchingNoTag,
ibv_wc_opcode::IBV_WC_DRIVER1 => WorkCompletionOperationType::Driver1,
ibv_wc_opcode::IBV_WC_DRIVER2 => WorkCompletionOperationType::Driver2,
ibv_wc_opcode::IBV_WC_DRIVER3 => WorkCompletionOperationType::Driver3,
_ => panic!("Unknown work completion opcode: {opcode}"),
}
}
}

#[bitmask(u64)]
#[bitmask_config(vec_debug)]
pub enum CreateCompletionQueueWorkCompletionFlags {
ByteLength = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_BYTE_LEN.0 as _,
ImmediateData = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_IMM.0 as _,
QueuePairNumber = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_QP_NUM.0 as _,
SourceQueuePair = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SRC_QP.0 as _,
SourceLocalIdentifier = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SLID.0 as _,
ServiceLevel = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SL.0 as _,
DestinationLocalIdentifierPathBits = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_DLID_PATH_BITS.0 as _,
CompletionTimestamp = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_COMPLETION_TIMESTAMP.0 as _,
CustomerVlan = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_CVLAN.0 as _,
FlowTag = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_FLOW_TAG.0 as _,
TagMatchingInformation = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_TM_INFO.0 as _,
CompletionTimestampWallclock = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK.0 as _,

StandardFlags = CreateCompletionQueueWorkCompletionFlags::ByteLength.bits
| CreateCompletionQueueWorkCompletionFlags::ImmediateData.bits
| CreateCompletionQueueWorkCompletionFlags::QueuePairNumber.bits
| CreateCompletionQueueWorkCompletionFlags::SourceQueuePair.bits
| CreateCompletionQueueWorkCompletionFlags::SourceLocalIdentifier.bits
| CreateCompletionQueueWorkCompletionFlags::ServiceLevel.bits
| CreateCompletionQueueWorkCompletionFlags::DestinationLocalIdentifierPathBits.bits,
}

#[derive(Debug)]
pub struct CompletionChannel<'res> {
pub(crate) channel: NonNull<ibv_comp_channel>,
Expand Down Expand Up @@ -169,8 +310,7 @@ impl<'res> CompletionQueueBuilder<'res> {
cq_context: ptr::null_mut::<c_void>(),
channel: ptr::null_mut::<ibv_comp_channel>(),
comp_vector: 0,
// TODO(zhp): setup default flags for CQ
wc_flags: 0,
wc_flags: CreateCompletionQueueWorkCompletionFlags::StandardFlags.bits,
comp_mask: 0,
flags: 0,
parent_domain: ptr::null_mut::<ibv_pd>(),
Expand All @@ -197,6 +337,11 @@ impl<'res> CompletionQueueBuilder<'res> {
self
}

pub fn setup_wc_flags(&mut self, wc_flags: CreateCompletionQueueWorkCompletionFlags) -> &mut Self {
self.init_attr.wc_flags = wc_flags.bits();
self
}

// TODO(fuji): set various attributes

// build extended cq
Expand Down Expand Up @@ -407,3 +552,94 @@ impl<'cq> Iterator for ExtendedPoller<'cq> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;

proptest! {
#[test]
fn test_work_completion_operation_type_conversion(opcode in 0u32..=256u32) {
if let Ok(wc_op_type) = std::panic::catch_unwind(|| WorkCompletionOperationType::from(opcode)) {
prop_assert_eq!(opcode, wc_op_type as u32);
} else {
// If it panics, it should be for an unknown opcode
prop_assert!((opcode < ibv_wc_opcode::IBV_WC_RECV && opcode > ibv_wc_opcode::IBV_WC_ATOMIC_WRITE) || (opcode > ibv_wc_opcode::IBV_WC_DRIVER3));
}
}

#[test]
fn test_work_completion_status_conversion(status in 0u32..=32u32) {
if let Ok(wc_status) = std::panic::catch_unwind(|| WorkCompletionStatus::from(status)) {
prop_assert_eq!(status, wc_status as u32);
} else {
// If it panics, it should be for an unknown status
prop_assert!(status > ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE);
}
}
}

#[test]
fn test_all_enum_variants() {
let completion_status_variants = [
WorkCompletionStatus::Success,
WorkCompletionStatus::LocalLengthError,
WorkCompletionStatus::LocalQueuePairOperationError,
WorkCompletionStatus::LocalEndToEndContextOperationError,
WorkCompletionStatus::LocalProtectionError,
WorkCompletionStatus::WorkRequestFlushedError,
WorkCompletionStatus::MemoryWindowBindError,
WorkCompletionStatus::BadResponseError,
WorkCompletionStatus::LocalAccessError,
WorkCompletionStatus::RemoteInvalidRequestError,
WorkCompletionStatus::RemoteAccessError,
WorkCompletionStatus::RemoteOperationError,
WorkCompletionStatus::RetryCounterExceededError,
WorkCompletionStatus::ResponderNotReadyRetryCounterExceededError,
WorkCompletionStatus::LocalReliableDatagramDomainViolationError,
WorkCompletionStatus::RemoteInvalidReliableDatagramRequest,
WorkCompletionStatus::RemoteAbortedError,
WorkCompletionStatus::InvalidEndToEndContextNumberError,
WorkCompletionStatus::InvalidEndToEndContextStateError,
WorkCompletionStatus::FatalError,
WorkCompletionStatus::ResponseTimeoutError,
WorkCompletionStatus::GeneralError,
WorkCompletionStatus::TagMatchingError,
WorkCompletionStatus::TagMatchingRendezvousIncomplete,
];

for &variant in &completion_status_variants {
let status = variant as u32;
assert_eq!(WorkCompletionStatus::from(status), variant);
}

let operation_type_variants = [
WorkCompletionOperationType::Send,
WorkCompletionOperationType::Write,
WorkCompletionOperationType::Read,
WorkCompletionOperationType::CompareAndSwap,
WorkCompletionOperationType::FetchAndAdd,
WorkCompletionOperationType::BindMemoryWindow,
WorkCompletionOperationType::LocalInvalidate,
WorkCompletionOperationType::TcpSegmentationOffload,
WorkCompletionOperationType::Flush,
WorkCompletionOperationType::AtomicWrite,
WorkCompletionOperationType::Receive,
WorkCompletionOperationType::ReceiveWithImmediate,
WorkCompletionOperationType::TagMatchingAdd,
WorkCompletionOperationType::TagMatchingDelete,
WorkCompletionOperationType::TagMatchingSync,
WorkCompletionOperationType::TagMatchingReceive,
WorkCompletionOperationType::TagMatchingNoTag,
WorkCompletionOperationType::Driver1,
WorkCompletionOperationType::Driver2,
WorkCompletionOperationType::Driver3,
];

for &variant in &operation_type_variants {
let opcode = variant as u32;
assert_eq!(WorkCompletionOperationType::from(opcode), variant);
}
}
}

0 comments on commit 5e6e783

Please sign in to comment.