Skip to content

Commit

Permalink
Introduce Submit trait and add Link struct for linked operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ileixe committed Feb 4, 2024
1 parent 350f637 commit f8d3dcf
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ io-uring = "0.6.0"
socket2 = { version = "0.4.4", features = ["all"] }
bytes = { version = "1.0", optional = true }
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
pin-project-lite = "0.2.13"

[dev-dependencies]
tempfile = "3.2.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
{env, io},
};

use tokio_uring::fs::File;
use tokio_uring::{fs::File, Submit};

fn main() {
// The file to `cat` is passed as a CLI argument
Expand Down
2 changes: 1 addition & 1 deletion examples/mix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::env;

use tokio_uring::{fs::File, net::TcpListener};
use tokio_uring::{fs::File, net::TcpListener, Submit};

fn main() {
// The file to serve over TCP is passed as a CLI argument
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, net::SocketAddr};

use tokio_uring::net::TcpStream;
use tokio_uring::{net::TcpStream, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
2 changes: 1 addition & 1 deletion examples/unix_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::env;

use tokio_uring::net::UnixListener;
use tokio_uring::{net::UnixListener, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
2 changes: 1 addition & 1 deletion examples/unix_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::env;

use tokio_uring::net::UnixStream;
use tokio_uring::{net::UnixStream, Submit};

fn main() {
let args: Vec<_> = env::args().collect();
Expand Down
1 change: 1 addition & 0 deletions examples/wrk-bench.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;
use std::rc::Rc;
use tokio::task::JoinHandle;
use tokio_uring::Submit;

pub const RESPONSE: &'static [u8] =
b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!";
Expand Down
7 changes: 6 additions & 1 deletion src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice};
use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::runtime::driver::op::{Op, Submit};
use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite};
use std::fmt;
use std::io;
Expand Down Expand Up @@ -32,6 +32,7 @@ use std::path::Path;
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -158,6 +159,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -518,6 +520,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -767,6 +770,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -804,6 +808,7 @@ impl File {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down
2 changes: 1 addition & 1 deletion src/io/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::io::write::UnsubmittedWrite;
use crate::runtime::driver::op::Op;
use crate::runtime::driver::op::{Op, Submit};
use crate::{
buf::fixed::FixedBuf,
buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice},
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//!
//! ```no_run
//! use tokio_uring::fs::File;
//! use tokio_uring::Submit;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! tokio_uring::start(async {
Expand Down Expand Up @@ -80,7 +81,9 @@ pub mod net;

pub use io::read::*;
pub use io::write::*;
pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot};
pub use runtime::driver::op::{
InFlightOneshot, OneshotOutputTransform, Submit, UnsubmittedOneshot,
};
pub use runtime::spawn;
pub use runtime::Runtime;

Expand All @@ -106,6 +109,7 @@ use std::future::Future;
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down Expand Up @@ -245,6 +249,7 @@ impl Builder {
///
/// ```no_run
/// use tokio_uring::fs::File;
/// use tokio_uring::Submit;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
Expand Down
1 change: 1 addition & 0 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
/// ```
/// use tokio_uring::net::TcpListener;
/// use tokio_uring::net::TcpStream;
/// use tokio_uring::Submit;
///
/// let listener = TcpListener::bind("127.0.0.1:2345".parse().unwrap()).unwrap();
///
Expand Down
1 change: 1 addition & 0 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
///
/// ```no_run
/// use tokio_uring::net::TcpStream;
/// use tokio_uring::Submit;
/// use std::net::ToSocketAddrs;
///
/// fn main() -> std::io::Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
///
/// ```
/// use tokio_uring::net::UdpSocket;
/// use tokio_uring::Submit;
/// use std::net::SocketAddr;
/// fn main() -> std::io::Result<()> {
/// tokio_uring::start(async {
Expand Down
1 change: 1 addition & 0 deletions src/net/unix/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{io, path::Path};
/// ```
/// use tokio_uring::net::UnixListener;
/// use tokio_uring::net::UnixStream;
/// use tokio_uring::Submit;
///
/// let sock_file = "/tmp/tokio-uring-unix-test.sock";
/// let listener = UnixListener::bind(&sock_file).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/net/unix/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
///
/// ```no_run
/// use tokio_uring::net::UnixStream;
/// use tokio_uring::Submit;
/// use std::net::ToSocketAddrs;
///
/// fn main() -> std::io::Result<()> {
Expand Down
87 changes: 87 additions & 0 deletions src/runtime/driver/op/link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use io_uring::squeue::Flags;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use crate::{OneshotOutputTransform, Submit, UnsubmittedOneshot};

pub struct Link<D, N> {
data: D,
next: N,
}

impl<D, N> Link<D, N> {
pub fn new(data: D, next: N) -> Self {
Self { data, next }
}
}

impl<D, N> Submit for Link<D, N>
where
D: Submit,
N: Submit,
{
type Output = LinkedInflightOneshot<D::Output, N::Output>;

fn submit(self) -> Self::Output {
LinkedInflightOneshot {
data: self.data.submit(),
next: Some(self.next.submit()),
}
}
}

impl<D1, T1: OneshotOutputTransform<StoredData = D1>, N> Link<UnsubmittedOneshot<D1, T1>, N> {
pub fn link<D2, T2: OneshotOutputTransform<StoredData = D2>>(
self,
other: UnsubmittedOneshot<D2, T2>,
) -> Link<UnsubmittedOneshot<D1, T1>, Link<N, UnsubmittedOneshot<D2, T2>>> {
Link {
data: self.data.set_flags(Flags::IO_LINK),
next: Link {
data: self.next,
next: other,
},
}
}

pub fn hard_link<D2, T2: OneshotOutputTransform<StoredData = D2>>(
self,
other: UnsubmittedOneshot<D2, T2>,
) -> Link<UnsubmittedOneshot<D1, T1>, Link<N, UnsubmittedOneshot<D2, T2>>> {
Link {
data: self.data.set_flags(Flags::IO_HARDLINK),
next: Link {
data: self.next,
next: other,
},
}
}
}

pin_project! {
pub struct LinkedInflightOneshot<D, N> {
#[pin]
data: D,
next: Option<N>,
}
}

impl<D, N> Future for LinkedInflightOneshot<D, N>
where
D: Future,
{
type Output = (D::Output, N);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

let output = ready!(this.data.poll(cx));
let next = this.next.take().unwrap();

Poll::Ready((output, next))
}
}
45 changes: 35 additions & 10 deletions src/runtime/driver/op/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::task::{Context, Poll, Waker};
use io_uring::squeue::Flags;
use io_uring::{cqueue, squeue};

mod link;
mod slab_list;

use link::Link;
use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};

Expand Down Expand Up @@ -38,29 +40,43 @@ impl<D, T: OneshotOutputTransform<StoredData = D>> UnsubmittedOneshot<D, T> {
}
}

/// Link two UnsubmittedOneshots.
pub fn link<D2, T2: OneshotOutputTransform<StoredData = D2>>(
self,
other: UnsubmittedOneshot<D2, T2>,
) -> Link<UnsubmittedOneshot<D, T>, UnsubmittedOneshot<D2, T2>> {
Link::new(self.set_flags(Flags::IO_LINK), other)
}

/// Hard-link two UnsubmittedOneshots.
pub fn hard_link<D2, T2: OneshotOutputTransform<StoredData = D2>>(
self,
other: UnsubmittedOneshot<D2, T2>,
) -> Link<UnsubmittedOneshot<D, T>, UnsubmittedOneshot<D2, T2>> {
Link::new(self.set_flags(Flags::IO_HARDLINK), other)
}

/// Set the SQE's flags.
pub fn set_flags(mut self, flags: Flags) -> Self {
pub(crate) fn set_flags(mut self, flags: Flags) -> Self {
self.sqe = self.sqe.flags(flags);
self
}
}

impl<D, T: OneshotOutputTransform<StoredData = D>> Submit for UnsubmittedOneshot<D, T> {
type Output = InFlightOneshot<D, T>;

/// Submit an operation to the driver for batched entry to the kernel.
pub fn submit(self) -> InFlightOneshot<D, T> {
fn submit(self) -> Self::Output {
let handle = CONTEXT
.with(|x| x.handle())
.expect("Could not submit op; not in runtime context");

self.submit_with_driver(&handle)
}

fn submit_with_driver(self, driver: &driver::Handle) -> InFlightOneshot<D, T> {
let index = driver.submit_op_2(self.sqe);

let driver = driver.into();
let index = handle.submit_op_2(self.sqe);

let inner = InFlightOneshotInner {
index,
driver,
driver: (&handle).into(),
stable_data: self.stable_data,
post_op: self.post_op,
};
Expand Down Expand Up @@ -121,6 +137,15 @@ impl<D: 'static, T: OneshotOutputTransform<StoredData = D>> Drop for InFlightOne
}
}

/// Submit an operation or operations to the driver.
pub trait Submit {
/// The output of the submission with an in-flight operation or linked in-flight operations.
type Output;

/// Submit an operation or linked operations.
fn submit(self) -> Self::Output;
}

/// Transforms the output of a oneshot operation into a more user-friendly format.
pub trait OneshotOutputTransform {
/// The final output after the transformation.
Expand Down
2 changes: 1 addition & 1 deletion tests/driver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use tempfile::NamedTempFile;

use tokio_uring::{buf::IoBuf, fs::File};
use tokio_uring::{buf::IoBuf, fs::File, Submit};

#[path = "../src/future.rs"]
#[allow(warnings)]
Expand Down
Loading

0 comments on commit f8d3dcf

Please sign in to comment.