Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bound channel elements processed every time process_events is called #200

Merged
merged 3 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 103 additions & 11 deletions src/sources/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
//! A synchronous version of the channel is provided by [`sync_channel`], in which
//! the [`SyncSender`] will block when the channel is full.

use std::cmp;
use std::fmt;
use std::sync::mpsc;

use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};

use super::ping::{make_ping, Ping, PingError, PingSource};

const MAX_EVENTS_CHECK: usize = 1024;

/// The events generated by the channel event source
#[derive(Debug)]
pub enum Event<T> {
Expand Down Expand Up @@ -123,6 +126,8 @@ impl<T> SyncSender<T> {
pub struct Channel<T> {
receiver: mpsc::Receiver<T>,
source: PingSource,
ping: Ping,
capacity: usize,
}

// This impl is safe because the Channel is only able to move around threads
Expand Down Expand Up @@ -156,14 +161,36 @@ impl<T> Channel<T> {
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = mpsc::channel();
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(Sender { sender, ping }, Channel { receiver, source })
(
Sender {
sender,
ping: ping.clone(),
},
Channel {
receiver,
ping,
source,
capacity: usize::MAX,
},
)
}

/// Create a new synchronous, bounded channel
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = mpsc::sync_channel(bound);
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(SyncSender { sender, ping }, Channel { receiver, source })
(
SyncSender {
sender,
ping: ping.clone(),
},
Channel {
receiver,
source,
ping,
capacity: bound,
},
)
}

impl<T> EventSource for Channel<T> {
Expand All @@ -182,18 +209,38 @@ impl<T> EventSource for Channel<T> {
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let receiver = &self.receiver;
self.source
.process_events(readiness, token, |(), &mut ()| loop {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
break;
let capacity = self.capacity;
let mut clear_readiness = false;

let action = self
.source
.process_events(readiness, token, |(), &mut ()| {
// Limit the number of elements we process at a time to the channel's capacity, or 1024.
let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK);
for _ in 0..max {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => {
clear_readiness = true;
break;
}
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
clear_readiness = true;
break;
}
}
}
})
.map_err(ChannelError)
.map_err(ChannelError)?;

if clear_readiness {
Ok(action)
} else {
// Re-notify the ping source so we can try again.
self.ping.ping();
Ok(PostAction::Continue)
}
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
Expand Down Expand Up @@ -338,4 +385,49 @@ mod tests {
assert_eq!(received.0, 3);
assert!(received.1);
}

#[test]
fn test_more_than_1024() {
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();

let (tx, rx) = channel::<()>();
let mut received = (0u32, false);

handle
.insert_source(
rx,
move |evt, &mut (), received: &mut (u32, bool)| match evt {
Event::Msg(()) => received.0 += 1,
Event::Closed => received.1 = true,
},
)
.unwrap();

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, 0);
assert!(!received.1);

// Send 1025 elements into the channel.
for _ in 0..MAX_EVENTS_CHECK + 1 {
tx.send(()).unwrap();
}

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, MAX_EVENTS_CHECK as u32);
assert!(!received.1);

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32);
assert!(!received.1);
}
}
134 changes: 88 additions & 46 deletions src/sources/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
state: Rc<State<T>>,

/// Notifies us when the executor is woken up.
ping: PingSource,
source: PingSource,

/// Used for when we need to wake ourselves up.
ping: Ping,
}

/// A scheduler to send futures to an executor
Expand Down Expand Up @@ -274,15 +277,16 @@
active_tasks: RefCell::new(Some(Slab::new())),
sender: Arc::new(Sender {
sender: Mutex::new(sender),
wake_up,
wake_up: wake_up.clone(),
notified: AtomicBool::new(false),
}),
});

Ok((
Executor {
state: state.clone(),
ping,
source: ping,
ping: wake_up,
},
Scheduler { state },
))
Expand All @@ -305,62 +309,66 @@
{
let state = &self.state;

let clear_readiness = {
// Set to the unnotified state.
state.sender.notified.store(false, Ordering::SeqCst);

let (clear_readiness, action) = {
let mut clear_readiness = false;

// Process runnables, but not too many at a time; better to move onto the next event quickly!
for _ in 0..1024 {
let runnable = match state.incoming.try_recv() {
Ok(runnable) => runnable,
Err(_) => {
// Make sure to clear the readiness if there are no more runnables.
clear_readiness = true;
break;
}
};
let action = self
.source
.process_events(readiness, token, |(), &mut ()| {
// Process runnables, but not too many at a time; better to move onto the next event quickly!
for _ in 0..1024 {
let runnable = match state.incoming.try_recv() {
Ok(runnable) => runnable,
Err(_) => {
// Make sure to clear the readiness if there are no more runnables.
clear_readiness = true;
break;
}
};

// Run the runnable.
let index = *runnable.metadata();
runnable.run();
// Run the runnable.
let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = state.active_tasks.borrow_mut();
let active_tasks = active_guard.as_mut().unwrap();
// If the runnable finished with a result, call the callback.
let mut active_guard = state.active_tasks.borrow_mut();
let active_tasks = active_guard.as_mut().unwrap();

if let Some(state) = active_tasks.get(index) {
if state.is_finished() {
// Take out the state and provide it to the caller.
let result = match active_tasks.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};
if let Some(state) = active_tasks.get(index) {
if state.is_finished() {
// Take out the state and provide it to the caller.
let result = match active_tasks.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),

Check warning on line 345 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L345

Added line #L345 was not covered by tests
};

// Drop the guard since the callback may register another future to the scheduler.
drop(active_guard);
// Drop the guard since the callback may register another future to the scheduler.
drop(active_guard);

callback(result, &mut ());
callback(result, &mut ());
}
}

Check warning on line 353 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L353

Added line #L353 was not covered by tests
}
}
}
})
.map_err(ExecutorError::WakeError)?;

clear_readiness
(clear_readiness, action)
};

// Clear the readiness of the ping source if there are no more runnables.
if clear_readiness {
self.ping
.process_events(readiness, token, |(), &mut ()| {})
.map_err(ExecutorError::WakeError)?;
// Re-ready the ping source if we need to re-run this handler.
if !clear_readiness {
self.ping.ping();
Ok(PostAction::Continue)
} else {
Ok(action)
}

// Set to the unnotified state.
state.sender.notified.store(false, Ordering::SeqCst);

Ok(PostAction::Continue)
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.ping.register(poll, token_factory)?;
self.source.register(poll, token_factory)?;
Ok(())
}

Expand All @@ -369,12 +377,12 @@
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)?;
self.source.reregister(poll, token_factory)?;

Check warning on line 380 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L380

Added line #L380 was not covered by tests
Ok(())
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.ping.unregister(poll)?;
self.source.unregister(poll)?;

Check warning on line 385 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L385

Added line #L385 was not covered by tests
Ok(())
}
}
Expand Down Expand Up @@ -405,6 +413,9 @@
mod tests {
use super::*;

use std::cell::RefCell;
use std::rc::Rc;

#[test]
fn ready() {
let mut event_loop = crate::EventLoop::<u32>::try_new().unwrap();
Expand Down Expand Up @@ -439,4 +450,35 @@
// the future has run
assert_eq!(got, 42);
}

#[test]
fn more_than_1024() {
let mut event_loop = crate::EventLoop::<()>::try_new().unwrap();
let handle = event_loop.handle();

let (exec, sched) = executor::<()>().unwrap();
handle.insert_source(exec, move |_, _, _| ()).unwrap();

let counter = Rc::new(RefCell::new(0));
for _ in 0..1025 {
let counter = counter.clone();
sched
.schedule(async move {
*counter.borrow_mut() += 1;
})
.unwrap();
}

event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut ())
.unwrap();

assert_eq!(*counter.borrow(), 1024);

event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut ())
.unwrap();

assert_eq!(*counter.borrow(), 1025);
}
}
Loading