Skip to content

Commit

Permalink
feat(cm): an usable RDMA CM interface with cmtime as example
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Yue <[email protected]>
  • Loading branch information
dragonJACson committed Nov 14, 2024
1 parent fd7802c commit 2c510a7
Show file tree
Hide file tree
Showing 5 changed files with 874 additions and 29 deletions.
1 change: 1 addition & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ task:
- export LD_LIBRARY_PATH=./rdma-core/build/lib
- just test-basic-with-cov
- just test-rc-pingpong-with-cov
- just test-cmtime-with-cov
- just generate-cov
- sed -i 's#/tmp/cirrus-ci-build/##g' lcov.info
- ./codecov --verbose upload-process --disable-search --fail-on-error -t $CODECOV_TOKEN --git-service github -f ./lcov.info
5 changes: 5 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ test-rc-pingpong-with-cov:
sleep 2
cargo llvm-cov --no-report run --features="debug" --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 127.0.0.1

test-cmtime-with-cov:
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} &
sleep 2
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} -s {{ip}}

generate-cov:
cargo llvm-cov report --lcov --output-path lcov.info
342 changes: 342 additions & 0 deletions examples/cmtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
use sideway::cm::communication_manager::{CommunicationManager, ConnectionParameter, Event, EventChannel, PortSpace};
use sideway::verbs::queue_pair::{QueuePair, QueuePairState};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread;

use clap::Parser;
use lazy_static::lazy_static;
use quanta::Instant;

#[derive(Debug, Parser)]
#[clap(name = "cmtime", version = "0.1.0")]
pub struct Args {
/// Listen on / connect to port
#[clap(long, short = 'p', default_value_t = 18515)]
port: u16,
/// Bind address
#[clap(long, short = 'b')]
bind_address: Option<String>,
/// If no value provided, start a server and wait for connection, otherwise, connect to server at [host]
#[clap(long, short = 's')]
server_address: Option<String>,
// Use self-created, self-modified QP
#[arg(long, short = 'q', default_value_t = false)]
self_modify: bool,
// Number of connections
#[arg(long, short = 'c', default_value_t = 100)]
connections: u32,
}

#[repr(usize)]
#[derive(Debug)]
pub enum Step {
CreateId,
Bind,
ResolveAddr,
ResolveRoute,
CreateQueuePair,
Connect,
Disconnect,
Destroy,
Count,
}

struct Node {
id: Arc<CommunicationManager>,
times: [(Instant, Instant); Step::Count as usize],
}

lazy_static! {
static ref STARTED: Mutex<[u32; Step::Count as usize]> = Mutex::new([0; Step::Count as usize]);
static ref COMPLETED: Mutex<[u32; Step::Count as usize]> = Mutex::new([0; Step::Count as usize]);
static ref TIMES: Mutex<[(Instant, Instant); Step::Count as usize]> =
Mutex::new([(Instant::recent(), Instant::recent()); Step::Count as usize]);
static ref CHANNEL: Mutex<EventChannel> =
Mutex::new(EventChannel::new().expect("Failed to create rdma cm event channel"));
}

fn cma_handler(id: Arc<CommunicationManager>, event: Event, sender: Option<Sender<Arc<CommunicationManager>>>) {
use sideway::cm::communication_manager::EventType::*;
let node: Option<Arc<Mutex<Node>>> = id.get_context();

match event.event_type() {
AddressResolved => {
node.unwrap().lock().unwrap().times[Step::ResolveAddr as usize].1 = Instant::now();
COMPLETED.lock().unwrap()[Step::ResolveAddr as usize] += 1;
},
RouteResolved => {
node.unwrap().lock().unwrap().times[Step::ResolveRoute as usize].1 = Instant::now();
COMPLETED.lock().unwrap()[Step::ResolveRoute as usize] += 1;
},
ConnectRequest => {
let cm_id = event.cm_id().clone().unwrap();
sender.unwrap().send(cm_id).unwrap();
},
Established => {
if node.is_some() {
// node.unwrap().lock().unwrap().times[Step::Connect as usize].1 = Instant::now();
COMPLETED.lock().unwrap()[Step::Connect as usize] += 1;
}
},
_ => {
println!("Other events: {:?}", event.event_type());
},
}
let _ = event.ack();
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

let id = CHANNEL.lock()?.create_id(PortSpace::Tcp)?;

if args.server_address.is_some() {
TIMES.lock().unwrap()[Step::CreateId as usize].0 = Instant::now();
let mut nodes: Vec<Arc<Mutex<Node>>> = vec![];
for _i in 0..args.connections {
let id = CHANNEL.lock()?.create_id(PortSpace::Tcp)?;
let node = Mutex::new(Node {
id: id.clone(),
times: [(Instant::recent(), Instant::recent()); Step::Count as usize],
});
id.setup_context(node);
let node: Arc<Mutex<Node>> = id.get_context().unwrap();
nodes.push(node);
}
TIMES.lock().unwrap()[Step::CreateId as usize].1 = Instant::now();
println!(
"{:?} time cost: {:?}us",
Step::CreateId,
TIMES
.lock()
.map(|time| time[Step::CreateId as usize]
.1
.duration_since(time[Step::CreateId as usize].0)
.as_micros())
.unwrap()
);

let _dispatcher = thread::spawn(move || loop {
match CHANNEL.lock().unwrap().get_cm_event() {
Ok(event) => cma_handler(event.cm_id().unwrap(), event, None),
Err(err) => {
eprintln!("{err}");
break;
},
}
});

let ip = IpAddr::from_str(&args.server_address.unwrap()).expect("Invalid IP address");
let server_addr = SocketAddr::from((ip, args.port));

let ip = IpAddr::from_str(&args.bind_address.unwrap()).expect("Invalid IP address");
let client_addr = SocketAddr::from((ip, 0));

TIMES.lock().unwrap()[Step::ResolveAddr as usize].0 = Instant::now();
for node in &nodes {
node.lock()
.unwrap()
.id
.resolve_addr(Some(client_addr), Some(server_addr), 2000)?;
STARTED.lock().unwrap()[Step::ResolveAddr as usize] += 1;
}

while STARTED.lock().unwrap()[Step::ResolveAddr as usize]
!= COMPLETED.lock().unwrap()[Step::ResolveAddr as usize]
{
thread::yield_now();
}

TIMES.lock().unwrap()[Step::ResolveAddr as usize].1 = Instant::now();
println!(
"{:?} time cost: {:?}us",
Step::ResolveAddr,
TIMES
.lock()
.map(|time| time[Step::ResolveAddr as usize]
.1
.duration_since(time[Step::ResolveAddr as usize].0)
.as_micros())
.unwrap()
);

TIMES.lock().unwrap()[Step::ResolveRoute as usize].0 = Instant::now();
for node in &nodes {
node.lock().unwrap().id.resolve_route(2000)?;
STARTED.lock().unwrap()[Step::ResolveRoute as usize] += 1;
}

while STARTED.lock().unwrap()[Step::ResolveRoute as usize]
!= COMPLETED.lock().unwrap()[Step::ResolveRoute as usize]
{
thread::yield_now();
}

TIMES.lock().unwrap()[Step::ResolveRoute as usize].1 = Instant::now();
println!(
"{:?} time cost: {:?}us",
Step::ResolveRoute,
TIMES
.lock()
.map(|time| time[Step::ResolveRoute as usize]
.1
.duration_since(time[Step::ResolveRoute as usize].0)
.as_micros())
.unwrap()
);

TIMES.lock().unwrap()[Step::CreateQueuePair as usize].0 = Instant::now();
for node in &nodes {
let id = &node.lock().unwrap().id;

id.create_qp().unwrap();
// let context = id.get_device_context()?;

// let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));

// let cq = context.create_cq_builder().setup_cqe(1).build().unwrap();

// let mut builder = pd.create_qp_builder();

// let mut qp = builder
// .setup_max_inline_data(0)
// .setup_send_cq(&cq)
// .setup_recv_cq(&cq)
// .setup_max_send_wr(1)
// .setup_max_recv_wr(1)
// .build()
// .unwrap_or_else(|_| panic!("Couldn't create QP"));

// let mut attr = QueuePairAttribute::new();
// attr.setup_state(QueuePairState::Init)
// .setup_pkey_index(0)
// .setup_port(id.port())
// .setup_access_flags(AccessFlags::LocalWrite | AccessFlags::RemoteWrite);
// qp.modify(&attr).unwrap();

// if !args.self_modify {
// id.bind_qp(&qp)?;
// }
}

TIMES.lock().unwrap()[Step::CreateQueuePair as usize].1 = Instant::now();
println!(
"{:?} time cost: {:?}us",
Step::CreateQueuePair,
TIMES
.lock()
.map(|time| time[Step::CreateQueuePair as usize]
.1
.duration_since(time[Step::CreateQueuePair as usize].0)
.as_micros())
.unwrap()
);

TIMES.lock().unwrap()[Step::Connect as usize].0 = Instant::now();
for node in &nodes {
let id = &node.lock().unwrap().id;
let mut qp = id.qp().unwrap();

let mut conn_param = ConnectionParameter::default();
conn_param.setup_qp_number(qp.qp_number());

id.connect(&mut conn_param).unwrap();

if args.self_modify {
let attr = id.get_qp_attr(QueuePairState::ReadyToReceive)?;
qp.modify(&attr).unwrap();

let attr = id.get_qp_attr(QueuePairState::ReadyToSend)?;
qp.modify(&attr).unwrap();

id.establish()?;
}

STARTED.lock().unwrap()[Step::Connect as usize] += 1;
}

while STARTED.lock().unwrap()[Step::Connect as usize] != COMPLETED.lock().unwrap()[Step::Connect as usize] {
thread::yield_now();
}

TIMES.lock().unwrap()[Step::Connect as usize].1 = Instant::now();
println!(
"{:?} time cost: {:?}us",
Step::Connect,
TIMES
.lock()
.map(|time| time[Step::Connect as usize]
.1
.duration_since(time[Step::Connect as usize].0)
.as_micros())
.unwrap()
);
} else {
id.bind_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), args.port))?;

id.listen(1024)?;

let (req_tx, req_rx) = channel();

let dispatcher = thread::spawn(move || loop {
match CHANNEL.lock().unwrap().get_cm_event() {
Ok(event) => cma_handler(event.cm_id().unwrap(), event, Some(req_tx.clone())),
Err(err) => {
eprintln!("{err}");
break;
},
}
});

let req_handler = thread::spawn(move || loop {
let cm_id: Arc<CommunicationManager> = req_rx.recv().expect("Failed to receive cm_id").clone();
cm_id.create_qp().unwrap();

// let qp = cm_id.qp().unwrap();
// let context = cm_id.get_device_context().expect("Failed to create device context");

// let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));

// let cq = context.create_cq_builder().setup_cqe(1).build().expect("Failed to build CQ");

// let mut builder = pd.create_qp_builder();

// let mut qp = builder
// .setup_max_inline_data(0)
// .setup_send_cq(&cq)
// .setup_recv_cq(&cq)
// .setup_max_send_wr(1)
// .setup_max_recv_wr(1)
// .build()
// .unwrap_or_else(|_| panic!("Couldn't create QP"));

// if !args.self_modify {
// cm_id.bind_qp(&qp).expect("Failed to bind QP");
// } else {
// let attr = cm_id.get_qp_attr(QueuePairState::Init).unwrap();
// qp.modify(&attr).unwrap();

// let attr = cm_id.get_qp_attr(QueuePairState::ReadyToReceive).unwrap();
// qp.modify(&attr).unwrap();

// let attr = cm_id.get_qp_attr(QueuePairState::ReadyToSend).unwrap();
// qp.modify(&attr).unwrap();
// }
// let mut conn_param = ConnectionParameter::default();
// conn_param.setup_qp_number(qp.qp_number());

cm_id.accept(None).unwrap();
unsafe {
Arc::increment_strong_count(Arc::into_raw(cm_id));
}
});

let _ = req_handler.join();
let _ = dispatcher.join();
}

Ok(())
}
Loading

0 comments on commit 2c510a7

Please sign in to comment.