From 5e6e783c957ffc1fde1d8548e5e7b1f6cfbb2c17 Mon Sep 17 00:00:00 2001 From: Luke Yue Date: Tue, 1 Oct 2024 05:50:51 -0700 Subject: [PATCH] feat(cq): add work completion status / opcode / flags Introduce IBV_WC_STANDARD_FLAGS from verbs.h as default flag for creating extended CQ. Signed-off-by: Luke Yue --- .gitignore | 3 + Cargo.toml | 9 +- src/verbs/completion.rs | 246 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 252 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 67b6042..6592f96 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ Cargo.lock # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +# Tests +proptest-regressions/ +wip/ diff --git a/Cargo.toml b/Cargo.toml index 6bca236..080cffe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,4 +25,11 @@ serde = { version = "1.0", features = ["derive"] } [dev-dependencies] trybuild = "1.0" -rstest = "0.22" +rstest = "0.23" +clap = { version = "4.5", features = ["derive"] } +rand = "0.8" +postcard = { version = "1.0", features = ["alloc"] } +quanta = "0.12" +byte-unit = "5.1" +ouroboros = "0.18" +proptest = "1.5" diff --git a/src/verbs/completion.rs b/src/verbs/completion.rs index 2ae89b7..025235a 100644 --- a/src/verbs/completion.rs +++ b/src/verbs/completion.rs @@ -4,14 +4,155 @@ use std::ptr; use std::ptr::NonNull; use std::{marker::PhantomData, mem::MaybeUninit}; +use bitmask_enum::bitmask; + use super::device_context::DeviceContext; use rdma_mummy_sys::{ ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex, - ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq, ibv_poll_cq_attr, - ibv_start_poll, ibv_wc, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_opcode, - ibv_wc_read_vendor_err, + ibv_create_cq_wc_flags, ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq, + ibv_poll_cq_attr, ibv_start_poll, ibv_wc, ibv_wc_opcode, ibv_wc_read_byte_len, ibv_wc_read_completion_ts, + ibv_wc_read_opcode, ibv_wc_read_vendor_err, ibv_wc_status, }; +#[repr(u32)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WorkCompletionStatus { + Success = ibv_wc_status::IBV_WC_SUCCESS, + LocalLengthError = ibv_wc_status::IBV_WC_LOC_LEN_ERR, + LocalQueuePairOperationError = ibv_wc_status::IBV_WC_LOC_QP_OP_ERR, + LocalEndToEndContextOperationError = ibv_wc_status::IBV_WC_LOC_EEC_OP_ERR, + LocalProtectionError = ibv_wc_status::IBV_WC_LOC_PROT_ERR, + WorkRequestFlushedError = ibv_wc_status::IBV_WC_WR_FLUSH_ERR, + MemoryWindowBindError = ibv_wc_status::IBV_WC_MW_BIND_ERR, + BadResponseError = ibv_wc_status::IBV_WC_BAD_RESP_ERR, + LocalAccessError = ibv_wc_status::IBV_WC_LOC_ACCESS_ERR, + RemoteInvalidRequestError = ibv_wc_status::IBV_WC_REM_INV_REQ_ERR, + RemoteAccessError = ibv_wc_status::IBV_WC_REM_ACCESS_ERR, + RemoteOperationError = ibv_wc_status::IBV_WC_REM_OP_ERR, + RetryCounterExceededError = ibv_wc_status::IBV_WC_RETRY_EXC_ERR, + ResponderNotReadyRetryCounterExceededError = ibv_wc_status::IBV_WC_RNR_RETRY_EXC_ERR, + LocalReliableDatagramDomainViolationError = ibv_wc_status::IBV_WC_LOC_RDD_VIOL_ERR, + RemoteInvalidReliableDatagramRequest = ibv_wc_status::IBV_WC_REM_INV_RD_REQ_ERR, + RemoteAbortedError = ibv_wc_status::IBV_WC_REM_ABORT_ERR, + InvalidEndToEndContextNumberError = ibv_wc_status::IBV_WC_INV_EECN_ERR, + InvalidEndToEndContextStateError = ibv_wc_status::IBV_WC_INV_EEC_STATE_ERR, + FatalError = ibv_wc_status::IBV_WC_FATAL_ERR, + ResponseTimeoutError = ibv_wc_status::IBV_WC_RESP_TIMEOUT_ERR, + GeneralError = ibv_wc_status::IBV_WC_GENERAL_ERR, + TagMatchingError = ibv_wc_status::IBV_WC_TM_ERR, + TagMatchingRendezvousIncomplete = ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE, +} + +impl From for WorkCompletionStatus { + fn from(status: u32) -> Self { + match status { + ibv_wc_status::IBV_WC_SUCCESS => WorkCompletionStatus::Success, + ibv_wc_status::IBV_WC_LOC_LEN_ERR => WorkCompletionStatus::LocalLengthError, + ibv_wc_status::IBV_WC_LOC_QP_OP_ERR => WorkCompletionStatus::LocalQueuePairOperationError, + ibv_wc_status::IBV_WC_LOC_EEC_OP_ERR => WorkCompletionStatus::LocalEndToEndContextOperationError, + ibv_wc_status::IBV_WC_LOC_PROT_ERR => WorkCompletionStatus::LocalProtectionError, + ibv_wc_status::IBV_WC_WR_FLUSH_ERR => WorkCompletionStatus::WorkRequestFlushedError, + ibv_wc_status::IBV_WC_MW_BIND_ERR => WorkCompletionStatus::MemoryWindowBindError, + ibv_wc_status::IBV_WC_BAD_RESP_ERR => WorkCompletionStatus::BadResponseError, + ibv_wc_status::IBV_WC_LOC_ACCESS_ERR => WorkCompletionStatus::LocalAccessError, + ibv_wc_status::IBV_WC_REM_INV_REQ_ERR => WorkCompletionStatus::RemoteInvalidRequestError, + ibv_wc_status::IBV_WC_REM_ACCESS_ERR => WorkCompletionStatus::RemoteAccessError, + ibv_wc_status::IBV_WC_REM_OP_ERR => WorkCompletionStatus::RemoteOperationError, + ibv_wc_status::IBV_WC_RETRY_EXC_ERR => WorkCompletionStatus::RetryCounterExceededError, + ibv_wc_status::IBV_WC_RNR_RETRY_EXC_ERR => WorkCompletionStatus::ResponderNotReadyRetryCounterExceededError, + ibv_wc_status::IBV_WC_LOC_RDD_VIOL_ERR => WorkCompletionStatus::LocalReliableDatagramDomainViolationError, + ibv_wc_status::IBV_WC_REM_INV_RD_REQ_ERR => WorkCompletionStatus::RemoteInvalidReliableDatagramRequest, + ibv_wc_status::IBV_WC_REM_ABORT_ERR => WorkCompletionStatus::RemoteAbortedError, + ibv_wc_status::IBV_WC_INV_EECN_ERR => WorkCompletionStatus::InvalidEndToEndContextNumberError, + ibv_wc_status::IBV_WC_INV_EEC_STATE_ERR => WorkCompletionStatus::InvalidEndToEndContextStateError, + ibv_wc_status::IBV_WC_FATAL_ERR => WorkCompletionStatus::FatalError, + ibv_wc_status::IBV_WC_RESP_TIMEOUT_ERR => WorkCompletionStatus::ResponseTimeoutError, + ibv_wc_status::IBV_WC_GENERAL_ERR => WorkCompletionStatus::GeneralError, + ibv_wc_status::IBV_WC_TM_ERR => WorkCompletionStatus::TagMatchingError, + ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE => WorkCompletionStatus::TagMatchingRendezvousIncomplete, + _ => panic!("Unknown work completion status: {status}"), + } + } +} + +#[repr(u32)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WorkCompletionOperationType { + Send = ibv_wc_opcode::IBV_WC_SEND, + Write = ibv_wc_opcode::IBV_WC_RDMA_WRITE, + Read = ibv_wc_opcode::IBV_WC_RDMA_READ, + CompareAndSwap = ibv_wc_opcode::IBV_WC_COMP_SWAP, + FetchAndAdd = ibv_wc_opcode::IBV_WC_FETCH_ADD, + BindMemoryWindow = ibv_wc_opcode::IBV_WC_BIND_MW, + LocalInvalidate = ibv_wc_opcode::IBV_WC_LOCAL_INV, + TcpSegmentationOffload = ibv_wc_opcode::IBV_WC_TSO, + Flush = ibv_wc_opcode::IBV_WC_FLUSH, + AtomicWrite = ibv_wc_opcode::IBV_WC_ATOMIC_WRITE, + Receive = ibv_wc_opcode::IBV_WC_RECV, + ReceiveWithImmediate = ibv_wc_opcode::IBV_WC_RECV_RDMA_WITH_IMM, + TagMatchingAdd = ibv_wc_opcode::IBV_WC_TM_ADD, + TagMatchingDelete = ibv_wc_opcode::IBV_WC_TM_DEL, + TagMatchingSync = ibv_wc_opcode::IBV_WC_TM_SYNC, + TagMatchingReceive = ibv_wc_opcode::IBV_WC_TM_RECV, + TagMatchingNoTag = ibv_wc_opcode::IBV_WC_TM_NO_TAG, + Driver1 = ibv_wc_opcode::IBV_WC_DRIVER1, + Driver2 = ibv_wc_opcode::IBV_WC_DRIVER2, + Driver3 = ibv_wc_opcode::IBV_WC_DRIVER3, +} + +impl From for WorkCompletionOperationType { + fn from(opcode: u32) -> Self { + match opcode { + ibv_wc_opcode::IBV_WC_SEND => WorkCompletionOperationType::Send, + ibv_wc_opcode::IBV_WC_RDMA_WRITE => WorkCompletionOperationType::Write, + ibv_wc_opcode::IBV_WC_RDMA_READ => WorkCompletionOperationType::Read, + ibv_wc_opcode::IBV_WC_COMP_SWAP => WorkCompletionOperationType::CompareAndSwap, + ibv_wc_opcode::IBV_WC_FETCH_ADD => WorkCompletionOperationType::FetchAndAdd, + ibv_wc_opcode::IBV_WC_BIND_MW => WorkCompletionOperationType::BindMemoryWindow, + ibv_wc_opcode::IBV_WC_LOCAL_INV => WorkCompletionOperationType::LocalInvalidate, + ibv_wc_opcode::IBV_WC_TSO => WorkCompletionOperationType::TcpSegmentationOffload, + ibv_wc_opcode::IBV_WC_FLUSH => WorkCompletionOperationType::Flush, + ibv_wc_opcode::IBV_WC_ATOMIC_WRITE => WorkCompletionOperationType::AtomicWrite, + ibv_wc_opcode::IBV_WC_RECV => WorkCompletionOperationType::Receive, + ibv_wc_opcode::IBV_WC_RECV_RDMA_WITH_IMM => WorkCompletionOperationType::ReceiveWithImmediate, + ibv_wc_opcode::IBV_WC_TM_ADD => WorkCompletionOperationType::TagMatchingAdd, + ibv_wc_opcode::IBV_WC_TM_DEL => WorkCompletionOperationType::TagMatchingDelete, + ibv_wc_opcode::IBV_WC_TM_SYNC => WorkCompletionOperationType::TagMatchingSync, + ibv_wc_opcode::IBV_WC_TM_RECV => WorkCompletionOperationType::TagMatchingReceive, + ibv_wc_opcode::IBV_WC_TM_NO_TAG => WorkCompletionOperationType::TagMatchingNoTag, + ibv_wc_opcode::IBV_WC_DRIVER1 => WorkCompletionOperationType::Driver1, + ibv_wc_opcode::IBV_WC_DRIVER2 => WorkCompletionOperationType::Driver2, + ibv_wc_opcode::IBV_WC_DRIVER3 => WorkCompletionOperationType::Driver3, + _ => panic!("Unknown work completion opcode: {opcode}"), + } + } +} + +#[bitmask(u64)] +#[bitmask_config(vec_debug)] +pub enum CreateCompletionQueueWorkCompletionFlags { + ByteLength = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_BYTE_LEN.0 as _, + ImmediateData = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_IMM.0 as _, + QueuePairNumber = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_QP_NUM.0 as _, + SourceQueuePair = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SRC_QP.0 as _, + SourceLocalIdentifier = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SLID.0 as _, + ServiceLevel = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_SL.0 as _, + DestinationLocalIdentifierPathBits = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_DLID_PATH_BITS.0 as _, + CompletionTimestamp = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_COMPLETION_TIMESTAMP.0 as _, + CustomerVlan = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_CVLAN.0 as _, + FlowTag = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_FLOW_TAG.0 as _, + TagMatchingInformation = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_TM_INFO.0 as _, + CompletionTimestampWallclock = ibv_create_cq_wc_flags::IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK.0 as _, + + StandardFlags = CreateCompletionQueueWorkCompletionFlags::ByteLength.bits + | CreateCompletionQueueWorkCompletionFlags::ImmediateData.bits + | CreateCompletionQueueWorkCompletionFlags::QueuePairNumber.bits + | CreateCompletionQueueWorkCompletionFlags::SourceQueuePair.bits + | CreateCompletionQueueWorkCompletionFlags::SourceLocalIdentifier.bits + | CreateCompletionQueueWorkCompletionFlags::ServiceLevel.bits + | CreateCompletionQueueWorkCompletionFlags::DestinationLocalIdentifierPathBits.bits, +} + #[derive(Debug)] pub struct CompletionChannel<'res> { pub(crate) channel: NonNull, @@ -169,8 +310,7 @@ impl<'res> CompletionQueueBuilder<'res> { cq_context: ptr::null_mut::(), channel: ptr::null_mut::(), comp_vector: 0, - // TODO(zhp): setup default flags for CQ - wc_flags: 0, + wc_flags: CreateCompletionQueueWorkCompletionFlags::StandardFlags.bits, comp_mask: 0, flags: 0, parent_domain: ptr::null_mut::(), @@ -197,6 +337,11 @@ impl<'res> CompletionQueueBuilder<'res> { self } + pub fn setup_wc_flags(&mut self, wc_flags: CreateCompletionQueueWorkCompletionFlags) -> &mut Self { + self.init_attr.wc_flags = wc_flags.bits(); + self + } + // TODO(fuji): set various attributes // build extended cq @@ -407,3 +552,94 @@ impl<'cq> Iterator for ExtendedPoller<'cq> { } } } + +#[cfg(test)] +mod tests { + use super::*; + use proptest::prelude::*; + + proptest! { + #[test] + fn test_work_completion_operation_type_conversion(opcode in 0u32..=256u32) { + if let Ok(wc_op_type) = std::panic::catch_unwind(|| WorkCompletionOperationType::from(opcode)) { + prop_assert_eq!(opcode, wc_op_type as u32); + } else { + // If it panics, it should be for an unknown opcode + prop_assert!((opcode < ibv_wc_opcode::IBV_WC_RECV && opcode > ibv_wc_opcode::IBV_WC_ATOMIC_WRITE) || (opcode > ibv_wc_opcode::IBV_WC_DRIVER3)); + } + } + + #[test] + fn test_work_completion_status_conversion(status in 0u32..=32u32) { + if let Ok(wc_status) = std::panic::catch_unwind(|| WorkCompletionStatus::from(status)) { + prop_assert_eq!(status, wc_status as u32); + } else { + // If it panics, it should be for an unknown status + prop_assert!(status > ibv_wc_status::IBV_WC_TM_RNDV_INCOMPLETE); + } + } + } + + #[test] + fn test_all_enum_variants() { + let completion_status_variants = [ + WorkCompletionStatus::Success, + WorkCompletionStatus::LocalLengthError, + WorkCompletionStatus::LocalQueuePairOperationError, + WorkCompletionStatus::LocalEndToEndContextOperationError, + WorkCompletionStatus::LocalProtectionError, + WorkCompletionStatus::WorkRequestFlushedError, + WorkCompletionStatus::MemoryWindowBindError, + WorkCompletionStatus::BadResponseError, + WorkCompletionStatus::LocalAccessError, + WorkCompletionStatus::RemoteInvalidRequestError, + WorkCompletionStatus::RemoteAccessError, + WorkCompletionStatus::RemoteOperationError, + WorkCompletionStatus::RetryCounterExceededError, + WorkCompletionStatus::ResponderNotReadyRetryCounterExceededError, + WorkCompletionStatus::LocalReliableDatagramDomainViolationError, + WorkCompletionStatus::RemoteInvalidReliableDatagramRequest, + WorkCompletionStatus::RemoteAbortedError, + WorkCompletionStatus::InvalidEndToEndContextNumberError, + WorkCompletionStatus::InvalidEndToEndContextStateError, + WorkCompletionStatus::FatalError, + WorkCompletionStatus::ResponseTimeoutError, + WorkCompletionStatus::GeneralError, + WorkCompletionStatus::TagMatchingError, + WorkCompletionStatus::TagMatchingRendezvousIncomplete, + ]; + + for &variant in &completion_status_variants { + let status = variant as u32; + assert_eq!(WorkCompletionStatus::from(status), variant); + } + + let operation_type_variants = [ + WorkCompletionOperationType::Send, + WorkCompletionOperationType::Write, + WorkCompletionOperationType::Read, + WorkCompletionOperationType::CompareAndSwap, + WorkCompletionOperationType::FetchAndAdd, + WorkCompletionOperationType::BindMemoryWindow, + WorkCompletionOperationType::LocalInvalidate, + WorkCompletionOperationType::TcpSegmentationOffload, + WorkCompletionOperationType::Flush, + WorkCompletionOperationType::AtomicWrite, + WorkCompletionOperationType::Receive, + WorkCompletionOperationType::ReceiveWithImmediate, + WorkCompletionOperationType::TagMatchingAdd, + WorkCompletionOperationType::TagMatchingDelete, + WorkCompletionOperationType::TagMatchingSync, + WorkCompletionOperationType::TagMatchingReceive, + WorkCompletionOperationType::TagMatchingNoTag, + WorkCompletionOperationType::Driver1, + WorkCompletionOperationType::Driver2, + WorkCompletionOperationType::Driver3, + ]; + + for &variant in &operation_type_variants { + let opcode = variant as u32; + assert_eq!(WorkCompletionOperationType::from(opcode), variant); + } + } +}