diff --git a/compio-driver/Cargo.toml b/compio-driver/Cargo.toml index 4e436d16..6f7f10e5 100644 --- a/compio-driver/Cargo.toml +++ b/compio-driver/Cargo.toml @@ -59,13 +59,11 @@ windows-sys = { workspace = true, features = [ [target.'cfg(target_os = "linux")'.dependencies] io-uring = { version = "0.7.0", optional = true } polling = { version = "3.3.0", optional = true } -os_pipe = { workspace = true, optional = true } paste = { workspace = true } # Other platform dependencies [target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] polling = "3.3.0" -os_pipe = { workspace = true } [target.'cfg(unix)'.dependencies] crossbeam-channel = { workspace = true } @@ -77,7 +75,7 @@ compio-buf = { workspace = true, features = ["arrayvec"] } [features] default = ["io-uring"] -polling = ["dep:polling", "dep:os_pipe"] +polling = ["dep:polling"] io-uring-sqe128 = [] io-uring-cqe32 = [] diff --git a/compio-driver/src/poll/mod.rs b/compio-driver/src/poll/mod.rs index da16010a..c8674537 100644 --- a/compio-driver/src/poll/mod.rs +++ b/compio-driver/src/poll/mod.rs @@ -3,7 +3,7 @@ pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; use std::{ collections::{HashMap, HashSet, VecDeque}, - io::{self, Read, Write}, + io, num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, @@ -15,7 +15,7 @@ use std::{ use compio_log::{instrument, trace}; use crossbeam_queue::SegQueue; pub(crate) use libc::{sockaddr_storage, socklen_t}; -use polling::{Event, Events, PollMode, Poller}; +use polling::{Event, Events, Poller}; use crate::{AsyncifyPool, Entry, Key, OutEntries, ProactorBuilder, op::Interest, syscall}; @@ -133,7 +133,6 @@ pub(crate) struct Driver { poll: Arc, registry: HashMap, cancelled: HashSet, - notifier: Notifier, pool: AsyncifyPool, pool_completed: Arc>, } @@ -149,21 +148,13 @@ impl Driver { Events::with_capacity(NonZeroUsize::new(entries).unwrap()) }; - let notifier = Notifier::new()?; - let fd = notifier.reader_fd(); - let poll = Arc::new(Poller::new()?); - // Attach the reader to poll. - unsafe { - poll.add_with_mode(fd, Event::new(fd as _, true, false), PollMode::Level)?; - } Ok(Self { events, poll, registry: HashMap::new(), cancelled: HashSet::new(), - notifier, pool: builder.create_or_get_thread_pool(), pool_completed: Arc::new(SegQueue::new()), }) @@ -255,10 +246,6 @@ impl Driver { } for event in self.events.iter() { let fd = event.key as RawFd; - if fd == self.notifier.reader_fd() { - self.notifier.clear()?; - continue; - } let queue = self .registry .get_mut(&fd) @@ -296,7 +283,7 @@ impl Driver { } pub fn handle(&self) -> io::Result { - self.notifier.handle() + Ok(NotifyHandle::new(self.poll.clone())) } } @@ -324,59 +311,18 @@ fn entry_cancelled(user_data: usize) -> Entry { ) } -struct Notifier { - notify_reader: os_pipe::PipeReader, - notify_writer: Arc, -} - -impl Notifier { - pub fn new() -> io::Result { - let (notify_reader, notify_writer) = os_pipe::pipe()?; - - // Set the reader as nonblocking. - let fd = notify_reader.as_raw_fd(); - let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?; - let flags = current_flags | libc::O_NONBLOCK; - if flags != current_flags { - syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?; - } - - Ok(Self { - notify_reader, - notify_writer: Arc::new(notify_writer), - }) - } - - pub fn handle(&self) -> io::Result { - Ok(NotifyHandle::new(self.notify_writer.clone())) - } - - pub fn clear(&self) -> io::Result<()> { - let mut buffer = [0u8]; - match (&self.notify_reader).read_exact(&mut buffer) { - Ok(()) => Ok(()), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()), - Err(e) => Err(e), - } - } - - pub fn reader_fd(&self) -> RawFd { - self.notify_reader.as_raw_fd() - } -} - /// A notify handle to the inner driver. pub struct NotifyHandle { - sender: Arc, + poll: Arc, } impl NotifyHandle { - fn new(sender: Arc) -> Self { - Self { sender } + fn new(poll: Arc) -> Self { + Self { poll } } /// Notify the inner driver. pub fn notify(&self) -> io::Result<()> { - (&*self.sender).write_all(&[1u8]) + self.poll.notify() } } diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index 20968634..3f90c435 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -50,7 +50,6 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] } # Unix specific dependencies [target.'cfg(unix)'.dependencies] -os_pipe = { workspace = true } libc = { workspace = true } [target.'cfg(windows)'.dev-dependencies]