diff --git a/compio-driver/src/poll/mod.rs b/compio-driver/src/poll/mod.rs index 9a598175..b90d51f9 100644 --- a/compio-driver/src/poll/mod.rs +++ b/compio-driver/src/poll/mod.rs @@ -4,7 +4,7 @@ pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; #[cfg(aio)] use std::ptr::NonNull; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, io, num::NonZeroUsize, os::fd::BorrowedFd, @@ -123,6 +123,11 @@ impl FdQueue { } } + pub fn remove(&mut self, user_data: usize) { + self.read_queue.retain(|&k| k != user_data); + self.write_queue.retain(|&k| k != user_data); + } + pub fn event(&self) -> Event { let mut event = Event::none(0); if let Some(&key) = self.read_queue.front() { @@ -167,7 +172,6 @@ pub(crate) struct Driver { events: Events, poll: Arc, registry: HashMap, - cancelled: HashSet, pool: AsyncifyPool, pool_completed: Arc>, } @@ -189,7 +193,6 @@ impl Driver { events, poll, registry: HashMap::new(), - cancelled: HashSet::new(), pool: builder.create_or_get_thread_pool(), pool_completed: Arc::new(SegQueue::new()), }) @@ -215,16 +218,49 @@ impl Driver { Ok(()) } + fn renew( + poll: &Poller, + registry: &mut HashMap, + fd: BorrowedFd, + renew_event: Event, + ) -> io::Result<()> { + if !renew_event.readable && !renew_event.writable { + poll.delete(fd)?; + registry.remove(&fd.as_raw_fd()); + } else { + poll.modify(fd, renew_event)?; + } + Ok(()) + } + pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> { Ok(()) } pub fn cancel(&mut self, op: &mut Key) { - self.cancelled.insert(op.user_data()); - #[cfg(aio)] - { - let op = op.as_op_pin(); - if let Some(OpType::Aio(aiocbp)) = op.op_type() { + let op_pin = op.as_op_pin(); + match op_pin.op_type() { + None => {} + Some(OpType::Fd(fd)) => { + let queue = self + .registry + .get_mut(&fd) + .expect("the fd should be attached"); + queue.remove(op.user_data()); + let renew_event = queue.event(); + if Self::renew( + &self.poll, + &mut self.registry, + unsafe { BorrowedFd::borrow_raw(fd) }, + renew_event, + ) + .is_ok() + { + self.pool_completed.push(entry_cancelled(op.user_data())); + } + } + #[cfg(aio)] + Some(OpType::Aio(aiocbp)) => { let aiocb = unsafe { aiocbp.as_ref() }; let fd = aiocb.aio_fildes; syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok(); @@ -314,21 +350,27 @@ impl Driver { } } - fn poll_blocking(&mut self) { + fn poll_blocking(&mut self) -> bool { + if self.pool_completed.is_empty() { + return false; + } while let Some(entry) = self.pool_completed.pop() { unsafe { entry.notify(); } } + true } pub unsafe fn poll(&mut self, timeout: Option) -> io::Result<()> { instrument!(compio_log::Level::TRACE, "poll", ?timeout); + if self.poll_blocking() { + return Ok(()); + } self.poll.wait(&mut self.events, timeout)?; - if self.events.is_empty() && self.pool_completed.is_empty() && timeout.is_some() { + if self.events.is_empty() && timeout.is_some() { return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); } - self.poll_blocking(); for event in self.events.iter() { let user_data = event.key; trace!("receive {} for {:?}", user_data, event); @@ -348,32 +390,27 @@ impl Driver { .get_mut(&fd) .expect("the fd should be attached"); if let Some((user_data, interest)) = queue.pop_interest(&event) { - if self.cancelled.remove(&user_data) { - entry_cancelled(user_data).notify(); - } else { - let mut op = Key::::new_unchecked(user_data); - let op = op.as_op_pin(); - let res = match op.operate() { - Poll::Pending => { - // The operation should go back to the front. - queue.push_front_interest(user_data, interest); - None - } - Poll::Ready(res) => Some(res), - }; - if let Some(res) = res { - Entry::new(user_data, res).notify(); + let mut op = Key::::new_unchecked(user_data); + let op = op.as_op_pin(); + let res = match op.operate() { + Poll::Pending => { + // The operation should go back to the front. + queue.push_front_interest(user_data, interest); + None } + Poll::Ready(res) => Some(res), + }; + if let Some(res) = res { + Entry::new(user_data, res).notify(); } } let renew_event = queue.event(); - let borrowed_fd = BorrowedFd::borrow_raw(fd); - if !renew_event.readable && !renew_event.writable { - self.poll.delete(borrowed_fd)?; - self.registry.remove(&fd); - } else { - self.poll.modify(borrowed_fd, renew_event)?; - } + Self::renew( + &self.poll, + &mut self.registry, + BorrowedFd::borrow_raw(fd), + renew_event, + )?; } #[cfg(aio)] Some(OpType::Aio(aiocbp)) => { @@ -388,7 +425,6 @@ impl Driver { continue; } libc::ECANCELED => { - self.cancelled.remove(&user_data); // Remove the aiocb from kqueue. libc::aio_return(aiocbp.as_ptr()); Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))