Skip to content

Commit

Permalink
feat(cm): an usable RDMA CM interface with cmtime as example
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Yue <[email protected]>
  • Loading branch information
dragonJACson committed Nov 11, 2024
1 parent b74cd5b commit 5bb92e3
Show file tree
Hide file tree
Showing 4 changed files with 633 additions and 28 deletions.
1 change: 1 addition & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ task:
- export LD_LIBRARY_PATH=./rdma-core/build/lib
- just test-basic-with-cov
- just test-rc-pingpong-with-cov
- just test-cmtime-with-cov
- just generate-cov
- sed -i 's#/tmp/cirrus-ci-build/##g' lcov.info
- ./codecov --verbose upload-process --disable-search --fail-on-error -t $CODECOV_TOKEN --git-service github -f ./lcov.info
5 changes: 5 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ test-rc-pingpong-with-cov:
sleep 2
cargo llvm-cov --no-report run --features="debug" --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 127.0.0.1

test-cmtime-with-cov:
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} -q &
sleep 2
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} -s {{ip}} -q

generate-cov:
cargo llvm-cov report --lcov --output-path lcov.info
163 changes: 163 additions & 0 deletions examples/cmtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use sideway::cm::communication_manager::{ConnectionParameter, EventChannel, PortSpace};
use sideway::verbs::queue_pair::{QueuePair, QueuePairAttribute, QueuePairState};
use sideway::verbs::AccessFlags;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;

use clap::Parser;

#[derive(Debug, Parser)]
#[clap(name = "cmtime", version = "0.1.0")]
pub struct Args {
/// Listen on / connect to port
#[clap(long, short = 'p', default_value_t = 18515)]
port: u16,
/// Bind address
#[clap(long, short = 'b')]
bind_address: Option<String>,
/// If no value provided, start a server and wait for connection, otherwise, connect to server at [host]
#[clap(long, short = 's')]
server_address: Option<String>,
// Use self-created, self-modified QP
#[arg(long, short = 'q', default_value_t = false)]
self_modify: bool,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let mut channel = EventChannel::new()?;

let id = channel.create_id(PortSpace::Tcp)?;

if args.server_address.is_some() {
let ip = IpAddr::from_str(&args.server_address.unwrap()).expect("Invalid IP address");
let server_addr = SocketAddr::from((ip, args.port));

let ip = IpAddr::from_str(&args.bind_address.unwrap()).expect("Invalid IP address");
let client_addr = SocketAddr::from((ip, 0));

id.resolve_addr(Some(client_addr), Some(server_addr), 2000)?;
assert_eq!(Arc::strong_count(&id), 1);

let event = channel.get_cm_event()?;
println!("Event is {:?}", event.event_type());
event.ack()?;

assert_eq!(Arc::strong_count(&id), 1);

id.resolve_route(2000)?;

let event = channel.get_cm_event()?;
println!("Event is {:?}", event.event_type());
event.ack()?;

assert_eq!(Arc::strong_count(&id), 1);

let context = id.get_device_context()?;

let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));

let cq = context.create_cq_builder().setup_cqe(1).build().unwrap();

let mut builder = pd.create_qp_builder();

let mut qp = builder
.setup_max_inline_data(0)
.setup_send_cq(&cq)
.setup_recv_cq(&cq)
.setup_max_send_wr(1)
.setup_max_recv_wr(1)
.build()
.unwrap_or_else(|_| panic!("Couldn't create QP"));

let mut attr = QueuePairAttribute::new();
attr.setup_state(QueuePairState::Init)
.setup_pkey_index(0)
.setup_port(id.port())
.setup_access_flags(AccessFlags::LocalWrite | AccessFlags::RemoteWrite);
qp.modify(&attr).unwrap();

if !args.self_modify {
id.bind_qp(&qp)?;
}

let mut conn_param = ConnectionParameter::default();
conn_param.setup_qp_number(qp.qp_number());

id.connect(&mut conn_param).unwrap();

let event = channel.get_cm_event()?;

println!("Event is {:?}, status {}", event.event_type(), event.status());
event.ack()?;

if args.self_modify {
let attr = id.get_qp_attr(QueuePairState::ReadyToReceive)?;
qp.modify(&attr).unwrap();

let attr = id.get_qp_attr(QueuePairState::ReadyToSend)?;
qp.modify(&attr).unwrap();

id.establish()?;
}
} else {
id.bind_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), args.port))?;

id.listen(1024)?;

let event = channel.get_cm_event()?;

println!("Event is {:?}", event.event_type());

let new_id = event.cm_id().unwrap();

{
let listener_id = event.listener_id().unwrap();
assert!(Arc::ptr_eq(&id, &listener_id));
}

let context = new_id.get_device_context()?;

let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));

let cq = context.create_cq_builder().setup_cqe(1).build().unwrap();

let mut builder = pd.create_qp_builder();

let mut qp = builder
.setup_max_inline_data(0)
.setup_send_cq(&cq)
.setup_recv_cq(&cq)
.setup_max_send_wr(1)
.setup_max_recv_wr(1)
.build()
.unwrap_or_else(|_| panic!("Couldn't create QP"));

if !args.self_modify {
new_id.bind_qp(&qp)?;
} else {
let attr = new_id.get_qp_attr(QueuePairState::Init)?;
qp.modify(&attr).unwrap();

let attr = new_id.get_qp_attr(QueuePairState::ReadyToReceive)?;
qp.modify(&attr).unwrap();

let attr = new_id.get_qp_attr(QueuePairState::ReadyToSend)?;
qp.modify(&attr).unwrap();
}

let mut conn_param = ConnectionParameter::default();
conn_param.setup_qp_number(qp.qp_number());

new_id.accept(&mut conn_param).unwrap();

let event = channel.get_cm_event()?;

println!("Event is {:?}, status {}", event.event_type(), event.status());
event.ack()?;
}

Ok(())
}
Loading

0 comments on commit 5bb92e3

Please sign in to comment.