Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify attach logic #200

Merged
merged 29 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2bf8eb7
refactor(driver,iocp): make IOCP global to all threads
Berrysoft Jan 28, 2024
6485870
refactor(runtime): simplify attacher
Berrysoft Jan 29, 2024
5ed954d
refactor(fs,net): impl TryClone
Berrysoft Jan 29, 2024
d72cdbf
fix(driver): adapt to new inner API
Berrysoft Jan 29, 2024
f3dacfc
fix(fs,windows): attach named pipe server
Berrysoft Jan 29, 2024
769853a
test(fs): create server in runtime
Berrysoft Jan 29, 2024
05e20f2
test(lib): use one stream in 2 threads
Berrysoft Jan 31, 2024
6794124
feat(driver,iocp): allow only one thread wait IOCP
Berrysoft Jan 31, 2024
883bdb5
fix(driver,iocp): use Condvar
Berrysoft Jan 31, 2024
1eec07f
fix(driver,iocp): reduce clone
Berrysoft Jan 31, 2024
acc88f8
fix(driver,iocp): use channel instead of handwriting queue
Berrysoft Jan 31, 2024
be7cedc
feat(driver,iocp): add iocp-global feature
Berrysoft Feb 3, 2024
1423187
feat(lib): add iocp-global to monocrate
Berrysoft Feb 3, 2024
be6b299
ci: add iocp-global to windows target
Berrysoft Feb 3, 2024
ec8b7a6
fix(driver): repost entry from other IOCP
Berrysoft Feb 3, 2024
89bdb86
feat(driver): expose PortId instead of usize
Berrysoft Feb 3, 2024
0038769
test(net): try to find out why it blocks
Berrysoft Feb 3, 2024
312348d
feat(driver,iocp): deregister from global port on drop
Berrysoft Feb 3, 2024
e9b7e6f
feat(driver,iocp): use IOCP instead of crossbeam channel
Berrysoft Feb 4, 2024
6ee42aa
feat(driver): use RawFd instead of HANDLE
Berrysoft Feb 4, 2024
6b32de3
doc(driver): add docs for mod cp
Berrysoft Feb 4, 2024
f14d3ef
fix(runtime): docs of Attacher and remove Attachable
Berrysoft Feb 4, 2024
1fbac8f
fix(Cargo.toml): remove useless skiplist dep
Berrysoft Feb 4, 2024
c90f23c
test(lib): fix for read_exact
Berrysoft Feb 4, 2024
a70d5d7
fix(driver): log post error
Berrysoft Feb 4, 2024
fd5510e
fix(driver,poll): mark submit unsafe
Berrysoft Feb 4, 2024
b140828
fix(driver): use InternalHigh as op result
Berrysoft Feb 16, 2024
b039ac5
fix: resolve conflict on unix
Berrysoft Mar 4, 2024
a9af894
fix(driver,iocp): readd AsRawFd impl
Berrysoft Mar 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ jobs:
target: "x86_64-pc-windows-gnu"
- os: "windows-latest"
target: "i686-pc-windows-msvc"
- os: "windows-latest"
target: "x86_64-pc-windows-msvc"
features: "iocp-global"
- os: "macos-12"
- os: "macos-13"
- os: "macos-14"
Expand Down
16 changes: 6 additions & 10 deletions compio-dispatcher/tests/listener.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{num::NonZeroUsize, panic::resume_unwind};

use compio_buf::{arrayvec::ArrayVec, IntoInner};
use compio_buf::arrayvec::ArrayVec;
use compio_dispatcher::Dispatcher;
use compio_io::{AsyncReadExt, AsyncWriteExt};
use compio_net::{TcpListener, TcpStream};
use compio_runtime::{spawn, Unattached};
use compio_runtime::spawn;
use futures_util::{stream::FuturesUnordered, StreamExt};

#[compio_macros::test]
Expand All @@ -27,15 +27,11 @@ async fn listener_dispatch() {
});
let mut handles = FuturesUnordered::new();
for _i in 0..CLIENT_NUM {
let (srv, _) = listener.accept().await.unwrap();
let srv = Unattached::new(srv).unwrap();
let (mut srv, _) = listener.accept().await.unwrap();
let handle = dispatcher
.dispatch(move || {
let mut srv = srv.into_inner();
async move {
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
assert_eq!(buf.as_slice(), b"Hello world!");
}
.dispatch(move || async move {
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
assert_eq!(buf.as_slice(), b"Hello world!");
})
.unwrap();
handles.push(handle.join());
Expand Down
3 changes: 3 additions & 0 deletions compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ polling = "3.3.0"
os_pipe = { workspace = true }

[target.'cfg(unix)'.dependencies]
crossbeam-channel = { workspace = true }
crossbeam-queue = { workspace = true }
libc = { workspace = true }

Expand All @@ -83,6 +84,8 @@ polling = ["dep:polling", "dep:os_pipe"]
io-uring-sqe128 = []
io-uring-cqe32 = []

iocp-global = []

# Nightly features
once_cell_try = []
nightly = ["once_cell_try"]
7 changes: 7 additions & 0 deletions compio-driver/src/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ impl Driver {
}
}

pub fn create_op<T: OpCode + 'static>(&self, user_data: usize, op: T) -> RawOp {
match &self.fuse {
FuseDriver::Poll(driver) => driver.create_op(user_data, op),
FuseDriver::IoUring(driver) => driver.create_op(user_data, op),
}
}

pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.attach(fd),
Expand Down
149 changes: 149 additions & 0 deletions compio-driver/src/iocp/cp/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#[cfg(feature = "once_cell_try")]
use std::sync::OnceLock;
use std::{
io,
os::windows::io::{AsRawHandle, RawHandle},
time::Duration,
};

use compio_log::*;
#[cfg(not(feature = "once_cell_try"))]
use once_cell::sync::OnceCell as OnceLock;
use windows_sys::Win32::System::IO::PostQueuedCompletionStatus;

use super::CompletionPort;
use crate::{syscall, Entry, Overlapped, RawFd};

struct GlobalPort {
port: CompletionPort,
}

impl GlobalPort {
pub fn new() -> io::Result<Self> {
Ok(Self {
port: CompletionPort::new()?,
})
}

pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.port.attach(fd)
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
self.port.post(res, optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
self.port.post_raw(optr)
}
}

impl AsRawHandle for GlobalPort {
fn as_raw_handle(&self) -> RawHandle {
self.port.as_raw_handle()
}
}

static IOCP_PORT: OnceLock<GlobalPort> = OnceLock::new();

#[inline]
fn iocp_port() -> io::Result<&'static GlobalPort> {
IOCP_PORT.get_or_try_init(GlobalPort::new)
}

fn iocp_start() -> io::Result<()> {
let port = iocp_port()?;
std::thread::spawn(move || {
instrument!(compio_log::Level::TRACE, "iocp_start");
loop {
for entry in port.port.poll_raw(None)? {
// Any thin pointer is OK because we don't use the type of opcode.
let overlapped_ptr: *mut Overlapped<()> = entry.lpOverlapped.cast();
let overlapped = unsafe { &*overlapped_ptr };
if let Err(_e) = syscall!(
BOOL,
PostQueuedCompletionStatus(
overlapped.driver as _,
entry.dwNumberOfBytesTransferred,
entry.lpCompletionKey,
entry.lpOverlapped,
)
) {
error!(
"fail to dispatch entry ({}, {}, {:p}) to driver {:p}: {:?}",
entry.dwNumberOfBytesTransferred,
entry.lpCompletionKey,
entry.lpOverlapped,
overlapped.driver,
_e
);
}
}
}
#[allow(unreachable_code)]
io::Result::Ok(())
});
Ok(())
}

static IOCP_INIT_ONCE: OnceLock<()> = OnceLock::new();

pub struct Port {
port: CompletionPort,
global_port: &'static GlobalPort,
}

impl Port {
pub fn new() -> io::Result<Self> {
IOCP_INIT_ONCE.get_or_try_init(iocp_start)?;

Ok(Self {
port: CompletionPort::new()?,
global_port: iocp_port()?,
})
}

pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
self.global_port.attach(fd)
}

pub fn handle(&self) -> PortHandle {
PortHandle::new(self.global_port)
}

pub fn poll(&self, timeout: Option<Duration>) -> io::Result<impl Iterator<Item = Entry> + '_> {
self.port.poll(timeout, None)
}
}

impl AsRawHandle for Port {
fn as_raw_handle(&self) -> RawHandle {
self.port.as_raw_handle()
}
}

pub struct PortHandle {
port: &'static GlobalPort,
}

impl PortHandle {
fn new(port: &'static GlobalPort) -> Self {
Self { port }
}

pub fn post<T: ?Sized>(
&self,
res: io::Result<usize>,
optr: *mut Overlapped<T>,
) -> io::Result<()> {
self.port.post(res, optr)
}

pub fn post_raw<T: ?Sized>(&self, optr: *const Overlapped<T>) -> io::Result<()> {
self.port.post_raw(optr)
}
}
Loading