Skip to content

Commit

Permalink
fix(driver,poll): remove pipe usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Oct 22, 2024
1 parent 464552d commit d982118
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 65 deletions.
4 changes: 1 addition & 3 deletions compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ windows-sys = { workspace = true, features = [
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { version = "0.7.0", optional = true }
polling = { version = "3.3.0", optional = true }
os_pipe = { workspace = true, optional = true }
paste = { workspace = true }

# Other platform dependencies
[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies]
polling = "3.3.0"
os_pipe = { workspace = true }

[target.'cfg(unix)'.dependencies]
crossbeam-channel = { workspace = true }
Expand All @@ -77,7 +75,7 @@ compio-buf = { workspace = true, features = ["arrayvec"] }

[features]
default = ["io-uring"]
polling = ["dep:polling", "dep:os_pipe"]
polling = ["dep:polling"]

io-uring-sqe128 = []
io-uring-cqe32 = []
Expand Down
68 changes: 7 additions & 61 deletions compio-driver/src/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::{
collections::{HashMap, HashSet, VecDeque},
io::{self, Read, Write},
io,
num::NonZeroUsize,
os::fd::BorrowedFd,
pin::Pin,
Expand All @@ -15,7 +15,7 @@ use std::{
use compio_log::{instrument, trace};
use crossbeam_queue::SegQueue;
pub(crate) use libc::{sockaddr_storage, socklen_t};
use polling::{Event, Events, PollMode, Poller};
use polling::{Event, Events, Poller};

use crate::{AsyncifyPool, Entry, Key, OutEntries, ProactorBuilder, op::Interest, syscall};

Expand Down Expand Up @@ -133,7 +133,6 @@ pub(crate) struct Driver {
poll: Arc<Poller>,
registry: HashMap<RawFd, FdQueue>,
cancelled: HashSet<usize>,
notifier: Notifier,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
}
Expand All @@ -149,21 +148,13 @@ impl Driver {
Events::with_capacity(NonZeroUsize::new(entries).unwrap())
};

let notifier = Notifier::new()?;
let fd = notifier.reader_fd();

let poll = Arc::new(Poller::new()?);
// Attach the reader to poll.
unsafe {
poll.add_with_mode(fd, Event::new(fd as _, true, false), PollMode::Level)?;
}

Ok(Self {
events,
poll,
registry: HashMap::new(),
cancelled: HashSet::new(),
notifier,
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
})
Expand Down Expand Up @@ -255,10 +246,6 @@ impl Driver {
}
for event in self.events.iter() {
let fd = event.key as RawFd;
if fd == self.notifier.reader_fd() {
self.notifier.clear()?;
continue;
}
let queue = self
.registry
.get_mut(&fd)
Expand Down Expand Up @@ -296,7 +283,7 @@ impl Driver {
}

pub fn handle(&self) -> io::Result<NotifyHandle> {
self.notifier.handle()
Ok(NotifyHandle::new(self.poll.clone()))
}
}

Expand Down Expand Up @@ -324,59 +311,18 @@ fn entry_cancelled(user_data: usize) -> Entry {
)
}

struct Notifier {
notify_reader: os_pipe::PipeReader,
notify_writer: Arc<os_pipe::PipeWriter>,
}

impl Notifier {
pub fn new() -> io::Result<Self> {
let (notify_reader, notify_writer) = os_pipe::pipe()?;

// Set the reader as nonblocking.
let fd = notify_reader.as_raw_fd();
let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
let flags = current_flags | libc::O_NONBLOCK;
if flags != current_flags {
syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
}

Ok(Self {
notify_reader,
notify_writer: Arc::new(notify_writer),
})
}

pub fn handle(&self) -> io::Result<NotifyHandle> {
Ok(NotifyHandle::new(self.notify_writer.clone()))
}

pub fn clear(&self) -> io::Result<()> {
let mut buffer = [0u8];
match (&self.notify_reader).read_exact(&mut buffer) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(()),
Err(e) => Err(e),
}
}

pub fn reader_fd(&self) -> RawFd {
self.notify_reader.as_raw_fd()
}
}

/// A notify handle to the inner driver.
pub struct NotifyHandle {
sender: Arc<os_pipe::PipeWriter>,
poll: Arc<Poller>,
}

impl NotifyHandle {
fn new(sender: Arc<os_pipe::PipeWriter>) -> Self {
Self { sender }
fn new(poll: Arc<Poller>) -> Self {
Self { poll }
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
(&*self.sender).write_all(&[1u8])
self.poll.notify()
}
}
1 change: 0 additions & 1 deletion compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] }

# Unix specific dependencies
[target.'cfg(unix)'.dependencies]
os_pipe = { workspace = true }
libc = { workspace = true }

[target.'cfg(windows)'.dev-dependencies]
Expand Down

0 comments on commit d982118

Please sign in to comment.