From b4099333ec68e44bf8564d294f28278ea2185b91 Mon Sep 17 00:00:00 2001 From: Jules Bertholet Date: Sat, 25 Nov 2023 20:43:50 -0500 Subject: [PATCH 1/3] Add missing `Arc` blocking methods Adds: - `Rwlock::read_arc_blocking` - `RwLock::upgradable_read_arc_blocking` - `RwLock::write_arc_blocking` - `RwLockUpgradableReadGuardArc::upgrade_blocking` - `Semaphore::acquire_arc_blocking` --- src/rwlock.rs | 132 +++++++++++++++++++++++++++++++++++++++++++++++ src/semaphore.rs | 55 ++++++++++++++++---- 2 files changed, 178 insertions(+), 9 deletions(-) diff --git a/src/rwlock.rs b/src/rwlock.rs index d8ace90..b8907d2 100644 --- a/src/rwlock.rs +++ b/src/rwlock.rs @@ -147,6 +147,41 @@ impl RwLock { pub fn read_arc<'a>(self: &'a Arc) -> ReadArc<'a, T> { ReadArc::new(self.raw.read(), self) } + + /// Acquires an owned, reference-counted read lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// Note that attempts to acquire a read lock will block if there are also concurrent attempts + /// to acquire a write lock. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`read_arc`][`RwLock::read_arc`] method, + /// this method will block the current thread until the read lock is acquired. + /// + /// This method should not be used in an asynchronous context. It is intended to be + /// used in a way that a lock can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use async_lock::RwLock; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let reader = lock.read_arc_blocking(); + /// assert_eq!(*reader, 1); + /// + /// assert!(lock.try_read().is_some()); + /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] + #[inline] + pub fn read_arc_blocking(self: &Arc) -> RwLockReadGuardArc { + self.read_arc().wait() + } } impl RwLock { @@ -347,6 +382,47 @@ impl RwLock { self.upgradable_read().wait() } + /// Attempts to acquire an owned, reference-counted read lock + /// with the possiblity to upgrade to a write lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// Upgradable read lock reserves the right to be upgraded to a write lock, which means there + /// can be at most one upgradable read lock at a time. + /// + /// Note that attempts to acquire an upgradable read lock will block if there are concurrent + /// attempts to acquire another upgradable read lock or a write lock. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`upgradable_read_arc`][`RwLock::upgradable_read_arc`] + /// method, this method will block the current thread until the read lock is acquired. + /// + /// This method should not be used in an asynchronous context. It is intended to be + /// used in a way that a lock can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use async_lock::{RwLock, RwLockUpgradableReadGuardArc}; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let reader = lock.upgradable_read_arc_blocking(); + /// assert_eq!(*reader, 1); + /// assert_eq!(*lock.try_read().unwrap(), 1); + /// + /// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader); + /// *writer = 2; + /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] + #[inline] + pub fn upgradable_read_arc_blocking(self: &Arc) -> RwLockUpgradableReadGuardArc { + self.upgradable_read_arc().wait() + } + /// Attempts to acquire an owned, reference-counted read lock with the possiblity to /// upgrade to a write lock. /// @@ -545,6 +621,36 @@ impl RwLock { WriteArc::new(self.raw.write(), self) } + /// Acquires an owned, reference-counted write lock. + /// + /// Returns a guard that releases the lock when dropped. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`write_arc`][RwLock::write_arc] method, this method will + /// block the current thread until the write lock is acquired. + /// + /// This method should not be used in an asynchronous context. It is intended to be + /// used in a way that a lock can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use async_lock::RwLock; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let writer = lock.write_arc_blocking(); + /// assert!(lock.try_read().is_none()); + /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] + #[inline] + pub fn write_arc_blocking(self: &Arc) -> RwLockWriteGuardArc { + self.write_arc().wait() + } + /// Returns a mutable reference to the inner value. /// /// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow @@ -1067,6 +1173,32 @@ impl RwLockUpgradableReadGuardArc { ) } } + + /// Upgrades into a write lock. + /// + /// # Blocking + /// + /// This function will block the current thread until it is able to acquire the write lock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use async_lock::{RwLock, RwLockUpgradableReadGuardArc}; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let reader = lock.upgradable_read_arc_blocking(); + /// assert_eq!(*reader, 1); + /// + /// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader); + /// *writer = 2; + /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] + #[inline] + pub fn upgrade_blocking(guard: Self) -> RwLockWriteGuardArc { + RwLockUpgradableReadGuardArc::upgrade(guard).wait() + } } /// A guard that releases the write lock when dropped. diff --git a/src/semaphore.rs b/src/semaphore.rs index 9015d05..7e6c69a 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -1,8 +1,7 @@ use core::fmt; -use core::future::Future; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; -use core::task::{Context, Poll}; +use core::task::Poll; use alloc::sync::Arc; @@ -174,10 +173,38 @@ impl Semaphore { /// # }); /// ``` pub fn acquire_arc(self: &Arc) -> AcquireArc { - AcquireArc { + AcquireArc::_new(AcquireArcInner { semaphore: self.clone(), listener: EventListener::new(), - } + }) + } + + /// Waits for an owned permit for a concurrent operation. + /// + /// Returns a guard that releases the permit when dropped. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`acquire_arc`][Semaphore::acquire_arc] method, + /// this method will block the current thread until the permit is acquired. + /// + /// This method should not be used in an asynchronous context. It is intended to be + /// used in a way that a semaphore can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use async_lock::Semaphore; + /// + /// let s = Arc::new(Semaphore::new(2)); + /// let guard = s.acquire_arc_blocking(); + /// ``` + #[cfg(all(feature = "std", not(target_family = "wasm")))] + #[inline] + pub fn acquire_arc_blocking(self: &Arc) -> SemaphoreGuardArc { + self.acquire_arc().wait() } /// Adds `n` additional permits to the semaphore. @@ -255,9 +282,15 @@ impl<'a> EventListenerFuture for AcquireInner<'a> { } } -pin_project_lite::pin_project! { +easy_wrapper! { /// The future returned by [`Semaphore::acquire_arc`]. - pub struct AcquireArc { + pub struct AcquireArc(AcquireArcInner => SemaphoreGuardArc); + #[cfg(all(feature = "std", not(target_family = "wasm")))] + pub(crate) wait(); +} + +pin_project_lite::pin_project! { + struct AcquireArcInner { // The semaphore being acquired. semaphore: Arc, @@ -273,10 +306,14 @@ impl fmt::Debug for AcquireArc { } } -impl Future for AcquireArc { +impl EventListenerFuture for AcquireArcInner { type Output = SemaphoreGuardArc; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_with_strategy<'x, S: Strategy<'x>>( + self: Pin<&mut Self>, + strategy: &mut S, + cx: &mut S::Context, + ) -> Poll { let mut this = self.project(); loop { @@ -287,7 +324,7 @@ impl Future for AcquireArc { if !this.listener.is_listening() { this.listener.as_mut().listen(&this.semaphore.event); } else { - ready!(this.listener.as_mut().poll(cx)); + ready!(strategy.poll(this.listener.as_mut(), cx)); } } } From 318a71368da15781c6f738896799341aba8610cd Mon Sep 17 00:00:00 2001 From: Jules Bertholet Date: Sat, 25 Nov 2023 20:44:06 -0500 Subject: [PATCH 2/3] Restore missing future `fmt::Debug` impls --- src/mutex.rs | 2 +- src/semaphore.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mutex.rs b/src/mutex.rs index 25890e5..5bad082 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -337,7 +337,7 @@ pin_project_lite::pin_project! { unsafe impl Send for Lock<'_, T> {} unsafe impl Sync for Lock<'_, T> {} -impl fmt::Debug for LockInner<'_, T> { +impl fmt::Debug for Lock<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Lock { .. }") } diff --git a/src/semaphore.rs b/src/semaphore.rs index 7e6c69a..b2c7b1d 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -250,7 +250,7 @@ pin_project_lite::pin_project! { } } -impl fmt::Debug for AcquireInner<'_> { +impl fmt::Debug for Acquire<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Acquire { .. }") } From 8cbc3b1ba7ff45c32188dd853691e587f63d02a4 Mon Sep 17 00:00:00 2001 From: Jules Bertholet Date: Sat, 25 Nov 2023 22:12:25 -0500 Subject: [PATCH 3/3] Add tests --- tests/mutex.rs | 8 ++++++++ tests/rwlock.rs | 18 ++++++++++++++++++ tests/semaphore.rs | 11 +++++++++++ 3 files changed, 37 insertions(+) diff --git a/tests/mutex.rs b/tests/mutex.rs index dce687a..01d4924 100644 --- a/tests/mutex.rs +++ b/tests/mutex.rs @@ -32,6 +32,14 @@ fn smoke_blocking() { drop(m.lock_blocking()); } +#[cfg(all(feature = "std", not(target_family = "wasm")))] +#[test] +fn smoke_arc_blocking() { + let m = Arc::new(Mutex::new(())); + drop(m.lock_arc_blocking()); + drop(m.lock_arc_blocking()); +} + #[test] fn try_lock() { let m = Mutex::new(()); diff --git a/tests/rwlock.rs b/tests/rwlock.rs index ab6fc37..fd38ea0 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -54,9 +54,27 @@ fn smoke_blocking() { drop(lock.read_blocking()); drop(lock.write_blocking()); drop((lock.read_blocking(), lock.read_blocking())); + let read = lock.read_blocking(); + let upgradabe = lock.upgradable_read_blocking(); + drop(read); + drop(RwLockUpgradableReadGuard::upgrade_blocking(upgradabe)); drop(lock.write_blocking()); } +#[cfg(all(feature = "std", not(target_family = "wasm")))] +#[test] +fn smoke_arc_blocking() { + let lock = Arc::new(RwLock::new(())); + drop(lock.read_arc_blocking()); + drop(lock.write_arc_blocking()); + drop((lock.read_arc_blocking(), lock.read_arc_blocking())); + let read = lock.read_arc_blocking(); + let upgradabe = lock.upgradable_read_arc_blocking(); + drop(read); + drop(RwLockUpgradableReadGuardArc::upgrade_blocking(upgradabe)); + drop(lock.write_arc_blocking()); +} + #[test] fn try_write() { future::block_on(async { diff --git a/tests/semaphore.rs b/tests/semaphore.rs index 0aab0b5..52042bd 100644 --- a/tests/semaphore.rs +++ b/tests/semaphore.rs @@ -125,6 +125,17 @@ fn smoke_blocking() { assert!(s.try_acquire().is_some()); } +#[cfg(all(feature = "std", not(target_family = "wasm")))] +#[test] +fn smoke_arc_blocking() { + let s = Arc::new(Semaphore::new(2)); + let g1 = s.acquire_arc_blocking(); + let _g2 = s.acquire_arc_blocking(); + assert!(s.try_acquire().is_none()); + drop(g1); + assert!(s.try_acquire().is_some()); +} + #[test] fn add_permits() { static COUNTER: AtomicUsize = AtomicUsize::new(0);