Skip to content

Commit

Permalink
Add an epoll::wait_uninit.
Browse files Browse the repository at this point in the history
This is like `epoll::wait`, except that it can read into an
uninitialized buffer, and it returns a pair of slices, similar to
`read_uninit`. This also means it doesn't require "alloc".
  • Loading branch information
sunfishcode committed Feb 4, 2025
1 parent c050a40 commit 0fc2024
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
use core::mem::MaybeUninit;
use core::slice;

/// Split an uninitialized byte slice into initialized and uninitialized parts.
/// Split an uninitialized slice into initialized and uninitialized parts.
///
/// # Safety
///
/// `init_len` must not be greater than `buf.len()`, and at least `init_len`
/// bytes must be initialized.
/// elements must be initialized.
#[inline]
pub(super) unsafe fn split_init(
buf: &mut [MaybeUninit<u8>],
pub(super) unsafe fn split_init<T>(
buf: &mut [MaybeUninit<T>],
init_len: usize,
) -> (&mut [u8], &mut [MaybeUninit<u8>]) {
) -> (&mut [T], &mut [MaybeUninit<T>]) {
debug_assert!(init_len <= buf.len());
let buf_ptr = buf.as_mut_ptr();
let uninit_len = buf.len() - init_len;
let init = slice::from_raw_parts_mut(buf_ptr.cast::<u8>(), init_len);
let init = slice::from_raw_parts_mut(buf_ptr.cast::<T>(), init_len);
let uninit = slice::from_raw_parts_mut(buf_ptr.add(init_len), uninit_len);
(init, uninit)
}
Expand Down
26 changes: 26 additions & 0 deletions src/event/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@
use super::epoll;
pub use crate::backend::event::epoll::*;
use crate::backend::event::syscalls;
use crate::buffer::split_init;
use crate::fd::{AsFd, OwnedFd};
use crate::io;
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::ffi::c_void;
use core::hash::{Hash, Hasher};
use core::mem::MaybeUninit;

/// `epoll_create1(flags)`—Creates a new epoll object.
///
Expand Down Expand Up @@ -190,6 +192,9 @@ pub fn delete<EpollFd: AsFd, SourceFd: AsFd>(epoll: EpollFd, source: SourceFd) -
/// For each event of interest, an element is written to `events`. On
/// success, this returns the number of written elements.
///
/// This takes a `&mut [Event]` which Rust requires to contain initialized
/// memory. To use an uninitialized buffer, use [`wait_uninit`].
///
/// # References
/// - [Linux]
/// - [illumos]
Expand All @@ -215,6 +220,27 @@ pub fn wait<EpollFd: AsFd>(
Ok(())
}

/// `epoll_wait(self, events, timeout)`—Waits for registered events of
/// interest.
///
/// This is identical to [`wait`], except that it can write the events into
/// uninitialized memory. It returns the slice that was initialized by this
/// function and the slice that remains uninitialized.
#[doc(alias = "epoll_wait")]
#[inline]
pub fn wait_uninit<EpollFd: AsFd>(
epoll: EpollFd,
event_list: &mut [MaybeUninit<Event>],
timeout: crate::ffi::c_int,
) -> io::Result<(&mut [Event], &mut [MaybeUninit<Event>])> {
// SAFETY: We're calling `epoll_wait` via FFI and we know how it
// behaves.
unsafe {
let nfds = syscalls::epoll_wait(epoll.as_fd(), event_list, timeout)?;
Ok(split_init(event_list, nfds))
}
}

/// A record of an event that occurred.
#[repr(C)]
#[cfg_attr(all(not(libc), target_arch = "x86_64"), repr(packed))]
Expand Down
77 changes: 77 additions & 0 deletions tests/event/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rustix::net::{
};
use std::collections::HashMap;
use std::ffi::c_void;
use std::mem::MaybeUninit;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

Expand Down Expand Up @@ -67,6 +68,62 @@ fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
}
}

fn server_uninit(ready: Arc<(Mutex<u16>, Condvar)>) {
let listen_sock = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
bind_v4(&listen_sock, &SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap();
listen(&listen_sock, 1).unwrap();

let who = match getsockname(&listen_sock).unwrap() {
SocketAddrAny::V4(addr) => addr,
_ => panic!(),
};

{
let (lock, cvar) = &*ready;
let mut port = lock.lock().unwrap();
*port = who.port();
cvar.notify_all();
}

let epoll = epoll::create(epoll::CreateFlags::CLOEXEC).unwrap();

epoll::add(
&epoll,
&listen_sock,
epoll::EventData::new_u64(1),
epoll::EventFlags::IN,
)
.unwrap();

let mut next_data = epoll::EventData::new_u64(2);
let mut targets = HashMap::new();

let mut event_list = [MaybeUninit::uninit(); 4];
loop {
let (init, _) = epoll::wait_uninit(&epoll, &mut event_list, -1).unwrap();
for event in init {
let target = event.data;
if target.u64() == 1 {
let conn_sock = accept(&listen_sock).unwrap();
ioctl_fionbio(&conn_sock, true).unwrap();
epoll::add(
&epoll,
&conn_sock,
next_data,
epoll::EventFlags::OUT | epoll::EventFlags::ET,
)
.unwrap();
targets.insert(next_data, conn_sock);
next_data = epoll::EventData::new_u64(next_data.u64() + 1);
} else {
let target = targets.remove(&target).unwrap();
write(&target, b"hello\n").unwrap();
epoll::delete(&epoll, &target).unwrap();
}
}
}
}

fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
let port = {
let (lock, cvar) = &*ready;
Expand Down Expand Up @@ -109,6 +166,26 @@ fn test_epoll() {
client.join().unwrap();
}

#[test]
fn test_epoll_uninit() {
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
let ready_clone = Arc::clone(&ready);

let _server = thread::Builder::new()
.name("server".to_string())
.spawn(move || {
server_uninit(ready);
})
.unwrap();
let client = thread::Builder::new()
.name("client".to_string())
.spawn(move || {
client(ready_clone);
})
.unwrap();
client.join().unwrap();
}

#[test]
fn test_epoll_event_data() {
let d = epoll::EventData::new_u64(0);
Expand Down

0 comments on commit 0fc2024

Please sign in to comment.