Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support blocking and non-blocking operations on the same mutex #56

Merged
merged 5 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exclude = ["/.*"]

[dependencies]
event-listener = { version = "3.0.0", default-features = false }
event-listener-strategy = { version = "0.2.0", default-features = false }
event-listener-strategy = { version = "0.3.0", default-features = false }
pin-project-lite = "0.2.11"

[features]
Expand Down
87 changes: 75 additions & 12 deletions src/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};

use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::task::Poll;

use crate::futures::Lock;
use crate::Mutex;
Expand Down Expand Up @@ -79,18 +79,67 @@ impl Barrier {
/// }
/// ```
pub fn wait(&self) -> BarrierWait<'_> {
BarrierWait {
BarrierWait::_new(BarrierWaitInner {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(&self.event),
state: WaitState::Initial,
}
})
}

/// Blocks the current thread until all tasks reach this point.
///
/// Barriers are reusable after all tasks have synchronized, and can be used continuously.
///
/// Returns a [`BarrierWaitResult`] indicating whether this task is the "leader", meaning the
/// last task to call this method.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`wait`] method, this method will
/// block the current thread until the wait is complete.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a barrier can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
notgull marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use async_lock::Barrier;
/// use futures_lite::future;
/// use std::sync::Arc;
/// use std::thread;
///
/// let barrier = Arc::new(Barrier::new(5));
///
/// for _ in 0..5 {
/// let b = barrier.clone();
/// thread::spawn(move || {
/// // The same messages will be printed together.
/// // There will NOT be interleaving of "before" and "after".
/// println!("before wait");
/// b.wait_blocking();
/// println!("after wait");
/// });
/// }
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_blocking(&self) -> BarrierWaitResult {
self.wait().wait()
}
}

easy_wrapper! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a>(BarrierWaitInner<'a> => BarrierWaitResult);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project_lite::pin_project! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
struct BarrierWaitInner<'a> {
// The barrier to wait on.
barrier: &'a Barrier,

Expand Down Expand Up @@ -124,18 +173,27 @@ enum WaitState {
Reacquiring { local_gen: u64 },
}

impl Future for BarrierWait<'_> {
impl EventListenerFuture for BarrierWaitInner<'_> {
type Output = BarrierWaitResult;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'a, S: Strategy<'a>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state {
WaitState::Initial => {
// See if the lock is ready yet.
let mut state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);
let mut state = ready!(this
.lock
.as_mut()
.as_pin_mut()
.unwrap()
.poll_with_strategy(strategy, cx));
this.lock.as_mut().set(None);

let local_gen = state.generation_id;
state.count += 1;
Expand All @@ -154,18 +212,23 @@ impl Future for BarrierWait<'_> {
}

WaitState::Waiting { local_gen } => {
ready!(this.evl.as_mut().poll(cx));
ready!(strategy.poll(this.evl.as_mut(), cx));

// We are now re-acquiring the mutex.
this.lock.set(Some(this.barrier.state.lock()));
this.lock.as_mut().set(Some(this.barrier.state.lock()));
*this.state = WaitState::Reacquiring {
local_gen: *local_gen,
};
}

WaitState::Reacquiring { local_gen } => {
// Acquire the local state again.
let state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
let state = ready!(this
.lock
.as_mut()
.as_pin_mut()
.unwrap()
.poll_with_strategy(strategy, cx));
this.lock.set(None);

if *local_gen == state.generation_id && state.count < this.barrier.n {
Expand Down
63 changes: 60 additions & 3 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,34 @@ impl<T: ?Sized> Mutex<T> {
})
}

/// Acquires the mutex using the blocking strategy.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`lock`] method, this method will
/// block the current thread until the lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a mutex can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
notgull marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn lock_blocking(&self) -> MutexGuard<'_, T> {
self.lock().wait()
}

/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
Expand Down Expand Up @@ -199,6 +227,35 @@ impl<T: ?Sized> Mutex<T> {
})
}

/// Acquires the mutex and clones a reference to it using the blocking strategy.
///
/// Returns an owned guard that releases the mutex when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`lock_arc`] method, this method will
/// block the current thread until the lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a mutex can be used in both asynchronous and synchronous contexts.
/// Calling this method in an `async` function or block may result in a deadlock.
notgull marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10));
/// let guard = mutex.lock_arc_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn lock_arc_blocking(self: &Arc<Self>) -> MutexGuardArc<T> {
self.lock_arc().wait()
}

/// Attempts to acquire the mutex and clone a reference to it.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
Expand Down Expand Up @@ -291,7 +348,7 @@ impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> {

#[inline]
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down Expand Up @@ -350,7 +407,7 @@ impl<T: ?Sized> EventListenerFuture for LockArcInnards<T> {
type Output = MutexGuardArc<T>;

fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&'a mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down Expand Up @@ -459,7 +516,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow

#[cold]
fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>(
mut self: Pin<&'a mut Self>,
mut self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
Expand Down
Loading