diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index caf8a7e6..eec2e761 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -206,7 +206,15 @@ impl Driver { Err(_) => { drop(squeue); self.poll_entries(entries); - self.submit_auto(Some(Duration::ZERO))?; + match self.submit_auto(Some(Duration::ZERO)) { + Ok(()) => {} + Err(e) + if matches!( + e.kind(), + io::ErrorKind::TimedOut | io::ErrorKind::Interrupted + ) => {} + Err(e) => return Err(e), + } } } } diff --git a/compio-driver/src/poll/mod.rs b/compio-driver/src/poll/mod.rs index 8fd04d31..e04f0f71 100644 --- a/compio-driver/src/poll/mod.rs +++ b/compio-driver/src/poll/mod.rs @@ -192,7 +192,7 @@ impl Driver { pub fn push( &mut self, op: &mut Key, - _entries: &mut OutEntries, + entries: &mut OutEntries, ) -> Poll> { let user_data = op.user_data(); let op_pin = op.as_op_pin(); @@ -205,13 +205,13 @@ impl Driver { Poll::Pending } Ok(Decision::Completed(res)) => Poll::Ready(Ok(res)), - Ok(Decision::Blocking(event)) => { + Ok(Decision::Blocking(event)) => loop { if self.push_blocking(user_data, event) { - Poll::Pending + break Poll::Pending; } else { - Poll::Ready(Err(io::Error::from_raw_os_error(libc::EBUSY))) + self.poll_blocking(entries); } - } + }, Err(err) => Poll::Ready(Err(err)), } } @@ -233,6 +233,12 @@ impl Driver { .is_ok() } + fn poll_blocking(&mut self, entries: &mut OutEntries) { + while let Some(entry) = self.pool_completed.pop() { + entries.notify(entry); + } + } + pub unsafe fn poll( &mut self, timeout: Option, @@ -242,9 +248,7 @@ impl Driver { if self.events.is_empty() && self.pool_completed.is_empty() && timeout.is_some() { return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); } - while let Some(entry) = self.pool_completed.pop() { - entries.notify(entry); - } + self.poll_blocking(entries); for event in self.events.iter() { let fd = event.key as RawFd; let queue = self diff --git a/compio/tests/runtime.rs b/compio/tests/runtime.rs index ac8db749..c5b0edfd 100644 --- a/compio/tests/runtime.rs +++ b/compio/tests/runtime.rs @@ -8,6 +8,7 @@ use compio::{ io::{AsyncReadAt, AsyncReadExt, AsyncWriteAt, AsyncWriteExt}, net::{TcpListener, TcpStream}, }; +use compio_driver::ProactorBuilder; use tempfile::NamedTempFile; #[compio_macros::test] @@ -126,21 +127,24 @@ async fn drop_on_complete() { drop(file); } -#[compio_macros::test] -#[cfg_attr( - not(any(windows, all(target_os = "linux", feature = "io-uring"))), - ignore -)] -async fn too_many_submissions() { - let tempfile = tempfile(); - - let mut file = File::create(tempfile.path()).await.unwrap(); - for _ in 0..600 { - poll_once(async { - file.write_at("hello world", 0).await.0.unwrap(); +#[test] +fn too_many_submissions() { + let mut proactor_builder = ProactorBuilder::new(); + proactor_builder.capacity(1).thread_pool_limit(1); + compio_runtime::Runtime::builder() + .with_proactor(proactor_builder) + .build() + .unwrap() + .block_on(async move { + let tempfile = tempfile(); + let mut file = File::create(tempfile.path()).await.unwrap(); + for _ in 0..600 { + poll_once(async { + file.write_at("hello world", 0).await.0.unwrap(); + }) + .await; + } }) - .await; - } } #[cfg(feature = "allocator_api")]