Skip to content

Commit

Permalink
fix(poll): cancel in the fd queue (#346)
Browse files Browse the repository at this point in the history
* fix(poll): cancel in the fd queue

* fix(poll): use retain instead of position - remove

* fix(poll): make renew safe
  • Loading branch information
Berrysoft authored Nov 24, 2024
1 parent c248bf6 commit 5132430
Showing 1 changed file with 70 additions and 34 deletions.
104 changes: 70 additions & 34 deletions compio-driver/src/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
#[cfg(aio)]
use std::ptr::NonNull;
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, VecDeque},
io,
num::NonZeroUsize,
os::fd::BorrowedFd,
Expand Down Expand Up @@ -123,6 +123,11 @@ impl FdQueue {
}
}

pub fn remove(&mut self, user_data: usize) {
self.read_queue.retain(|&k| k != user_data);
self.write_queue.retain(|&k| k != user_data);
}

pub fn event(&self) -> Event {
let mut event = Event::none(0);
if let Some(&key) = self.read_queue.front() {
Expand Down Expand Up @@ -167,7 +172,6 @@ pub(crate) struct Driver {
events: Events,
poll: Arc<Poller>,
registry: HashMap<RawFd, FdQueue>,
cancelled: HashSet<usize>,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
}
Expand All @@ -189,7 +193,6 @@ impl Driver {
events,
poll,
registry: HashMap::new(),
cancelled: HashSet::new(),
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
})
Expand All @@ -215,16 +218,49 @@ impl Driver {
Ok(())
}

fn renew(
poll: &Poller,
registry: &mut HashMap<RawFd, FdQueue>,
fd: BorrowedFd,
renew_event: Event,
) -> io::Result<()> {
if !renew_event.readable && !renew_event.writable {
poll.delete(fd)?;
registry.remove(&fd.as_raw_fd());
} else {
poll.modify(fd, renew_event)?;
}
Ok(())
}

pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
Ok(())
}

pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
self.cancelled.insert(op.user_data());
#[cfg(aio)]
{
let op = op.as_op_pin();
if let Some(OpType::Aio(aiocbp)) = op.op_type() {
let op_pin = op.as_op_pin();
match op_pin.op_type() {
None => {}
Some(OpType::Fd(fd)) => {
let queue = self
.registry
.get_mut(&fd)
.expect("the fd should be attached");
queue.remove(op.user_data());
let renew_event = queue.event();
if Self::renew(
&self.poll,
&mut self.registry,
unsafe { BorrowedFd::borrow_raw(fd) },
renew_event,
)
.is_ok()
{
self.pool_completed.push(entry_cancelled(op.user_data()));
}
}
#[cfg(aio)]
Some(OpType::Aio(aiocbp)) => {
let aiocb = unsafe { aiocbp.as_ref() };
let fd = aiocb.aio_fildes;
syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
Expand Down Expand Up @@ -314,21 +350,27 @@ impl Driver {
}
}

fn poll_blocking(&mut self) {
fn poll_blocking(&mut self) -> bool {
if self.pool_completed.is_empty() {
return false;
}
while let Some(entry) = self.pool_completed.pop() {
unsafe {
entry.notify();
}
}
true
}

pub unsafe fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
if self.poll_blocking() {
return Ok(());
}
self.poll.wait(&mut self.events, timeout)?;
if self.events.is_empty() && self.pool_completed.is_empty() && timeout.is_some() {
if self.events.is_empty() && timeout.is_some() {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
self.poll_blocking();
for event in self.events.iter() {
let user_data = event.key;
trace!("receive {} for {:?}", user_data, event);
Expand All @@ -348,32 +390,27 @@ impl Driver {
.get_mut(&fd)
.expect("the fd should be attached");
if let Some((user_data, interest)) = queue.pop_interest(&event) {
if self.cancelled.remove(&user_data) {
entry_cancelled(user_data).notify();
} else {
let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
let op = op.as_op_pin();
let res = match op.operate() {
Poll::Pending => {
// The operation should go back to the front.
queue.push_front_interest(user_data, interest);
None
}
Poll::Ready(res) => Some(res),
};
if let Some(res) = res {
Entry::new(user_data, res).notify();
let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
let op = op.as_op_pin();
let res = match op.operate() {
Poll::Pending => {
// The operation should go back to the front.
queue.push_front_interest(user_data, interest);
None
}
Poll::Ready(res) => Some(res),
};
if let Some(res) = res {
Entry::new(user_data, res).notify();
}
}
let renew_event = queue.event();
let borrowed_fd = BorrowedFd::borrow_raw(fd);
if !renew_event.readable && !renew_event.writable {
self.poll.delete(borrowed_fd)?;
self.registry.remove(&fd);
} else {
self.poll.modify(borrowed_fd, renew_event)?;
}
Self::renew(
&self.poll,
&mut self.registry,
BorrowedFd::borrow_raw(fd),
renew_event,
)?;
}
#[cfg(aio)]
Some(OpType::Aio(aiocbp)) => {
Expand All @@ -388,7 +425,6 @@ impl Driver {
continue;
}
libc::ECANCELED => {
self.cancelled.remove(&user_data);
// Remove the aiocb from kqueue.
libc::aio_return(aiocbp.as_ptr());
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
Expand Down

0 comments on commit 5132430

Please sign in to comment.