Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument with loom #86

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ jobs:
run: rustup update stable
- run: cargo clippy --all-features --all-targets

loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: cargo test --release --test loom --features loom
env:
RUSTFLAGS: "--cfg=loom"
LOOM_MAX_PREEMPTIONS: 4

fmt:
runs-on: ubuntu-latest
steps:
Expand Down
15 changes: 15 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ exclude = ["/.*"]
event-listener = { version = "5.0.0", default-features = false }
event-listener-strategy = { version = "0.5.0", default-features = false }
pin-project-lite = "0.2.11"
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }

[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
default-features = false
optional = true

[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }

[features]
default = ["std"]
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the changes to lib.rs, but this doesn't really seem to be in use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, I was going to add a portable-atomic impl but forgot to. Will fix this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#87

std = ["event-listener/std", "event-listener-strategy/std"]
loom = ["event-listener/loom", "dep:loom"]

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }

[dev-dependencies]
async-channel = "2.2.0"
Expand Down
47 changes: 25 additions & 22 deletions src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,31 @@ struct State {
}

impl Barrier {
/// Creates a barrier that can block the given number of tasks.
///
/// A barrier will block `n`-1 tasks which call [`wait()`] and then wake up all tasks
/// at once when the `n`th task calls [`wait()`].
///
/// [`wait()`]: `Barrier::wait()`
///
/// # Examples
///
/// ```
/// use async_lock::Barrier;
///
/// let barrier = Barrier::new(5);
/// ```
pub const fn new(n: usize) -> Barrier {
Barrier {
n,
state: Mutex::new(State {
count: 0,
generation_id: 0,
}),
event: Event::new(),
const_fn! {
const_if: #[cfg(not(loom))];
/// Creates a barrier that can block the given number of tasks.
///
/// A barrier will block `n`-1 tasks which call [`wait()`] and then wake up all tasks
/// at once when the `n`th task calls [`wait()`].
///
/// [`wait()`]: `Barrier::wait()`
///
/// # Examples
///
/// ```
/// use async_lock::Barrier;
///
/// let barrier = Barrier::new(5);
/// ```
pub const fn new(n: usize) -> Barrier {
Barrier {
n,
state: Mutex::new(State {
count: 0,
generation_id: 0,
}),
event: Event::new(),
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ macro_rules! pin {
}
}

/// Make the given function const if the given condition is true.
macro_rules! const_fn {
(
const_if: #[cfg($($cfg:tt)+)];
$(#[$($attr:tt)*])*
$vis:vis const fn $($rest:tt)*
) => {
#[cfg($($cfg)+)]
$(#[$($attr)*])*
$vis const fn $($rest)*
#[cfg(not($($cfg)+))]
$(#[$($attr)*])*
$vis fn $($rest)*
};
}

mod barrier;
mod mutex;
mod once_cell;
Expand All @@ -97,6 +113,38 @@ pub mod futures {
pub use crate::semaphore::{Acquire, AcquireArc};
}

#[cfg(not(loom))]
/// Synchronization primitive implementation.
mod sync {
pub(super) use core::sync::atomic;

pub(super) trait WithMut {
type Output;

fn with_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Output) -> R;
}

impl WithMut for atomic::AtomicUsize {
type Output = usize;

#[inline]
fn with_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self::Output) -> R,
{
f(self.get_mut())
}
}
}

#[cfg(loom)]
/// Synchronization primitive implementation.
mod sync {
pub(super) use loom::sync::atomic;
}

#[cold]
fn abort() -> ! {
// For no_std targets, panicking while panicking is defined as an abort
Expand Down
46 changes: 21 additions & 25 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use core::fmt;
use core::marker::{PhantomData, PhantomPinned};
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::Poll;
use core::usize;

use alloc::sync::Arc;

// We don't use loom::UnsafeCell as that doesn't work with the Mutex API.
use crate::sync::atomic::{AtomicUsize, Ordering};

#[cfg(all(feature = "std", not(target_family = "wasm")))]
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -56,20 +58,23 @@ unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}

impl<T> Mutex<T> {
/// Creates a new async mutex.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
///
/// let mutex = Mutex::new(0);
/// ```
pub const fn new(data: T) -> Mutex<T> {
Mutex {
state: AtomicUsize::new(0),
lock_ops: Event::new(),
data: UnsafeCell::new(data),
const_fn! {
const_if: #[cfg(not(loom))];
/// Creates a new async mutex.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
///
/// let mutex = Mutex::new(0);
/// ```
pub const fn new(data: T) -> Mutex<T> {
Mutex {
state: AtomicUsize::new(0),
lock_ops: Event::new(),
data: UnsafeCell::new(data),
}
}
}

Expand Down Expand Up @@ -186,7 +191,7 @@ impl<T: ?Sized> Mutex<T> {
/// # })
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
self.data.get_mut()
}

/// Unlocks the mutex directly.
Expand Down Expand Up @@ -756,12 +761,3 @@ impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
unsafe { &mut *self.0.data.get() }
}
}

/// Calls a function when dropped.
struct CallOnDrop<F: Fn()>(F);

impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
89 changes: 51 additions & 38 deletions src/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use core::fmt;
use core::future::Future;
use core::mem::{forget, MaybeUninit};
use core::ptr;
use core::sync::atomic::{AtomicUsize, Ordering};

use crate::sync::atomic::{AtomicUsize, Ordering};

#[cfg(not(loom))]
use crate::sync::WithMut;

#[cfg(all(feature = "std", not(target_family = "wasm")))]
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
Expand Down Expand Up @@ -107,22 +111,25 @@ unsafe impl<T: Send> Send for OnceCell<T> {}
unsafe impl<T: Send + Sync> Sync for OnceCell<T> {}

impl<T> OnceCell<T> {
/// Create a new, uninitialized `OnceCell`.
///
/// # Example
///
/// ```rust
/// use async_lock::OnceCell;
///
/// let cell = OnceCell::new();
/// # cell.set_blocking(1);
/// ```
pub const fn new() -> Self {
Self {
active_initializers: Event::new(),
passive_waiters: Event::new(),
state: AtomicUsize::new(State::Uninitialized as _),
value: UnsafeCell::new(MaybeUninit::uninit()),
const_fn! {
const_if: #[cfg(not(loom))];
/// Create a new, uninitialized `OnceCell`.
///
/// # Example
///
/// ```rust
/// use async_lock::OnceCell;
///
/// let cell = OnceCell::new();
/// # cell.set_blocking(1);
/// ```
pub const fn new() -> Self {
Self {
active_initializers: Event::new(),
passive_waiters: Event::new(),
state: AtomicUsize::new(State::Uninitialized as _),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}

Expand Down Expand Up @@ -194,13 +201,15 @@ impl<T> OnceCell<T> {
/// # });
/// ```
pub fn get_mut(&mut self) -> Option<&mut T> {
if State::from(*self.state.get_mut()) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// read it.
Some(unsafe { &mut *self.value.get().cast() })
} else {
None
}
self.state.with_mut(|state| {
if State::from(*state) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// read it.
Some(unsafe { &mut *self.value.get().cast() })
} else {
None
}
})
}

/// Take the value out of this `OnceCell`, moving it back to the uninitialized
Expand All @@ -219,15 +228,17 @@ impl<T> OnceCell<T> {
/// # });
/// ```
pub fn take(&mut self) -> Option<T> {
if State::from(*self.state.get_mut()) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// read it.
let value = unsafe { ptr::read(self.value.get().cast()) };
*self.state.get_mut() = State::Uninitialized.into();
Some(value)
} else {
None
}
self.state.with_mut(|state| {
if State::from(*state) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// read it.
let value = unsafe { ptr::read(self.value.get().cast()) };
*state = State::Uninitialized.into();
Some(value)
} else {
None
}
})
}

/// Convert this `OnceCell` into the inner value, if it is initialized.
Expand Down Expand Up @@ -754,11 +765,13 @@ impl<T: fmt::Debug> fmt::Debug for OnceCell<T> {

impl<T> Drop for OnceCell<T> {
fn drop(&mut self) {
if State::from(*self.state.get_mut()) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// drop it.
unsafe { self.value.get().cast::<T>().drop_in_place() }
}
self.state.with_mut(|state| {
if State::from(*state) == State::Initialized {
// SAFETY: We know that the value is initialized, so it is safe to
// drop it.
unsafe { self.value.get().cast::<T>().drop_in_place() }
}
});
}
}

Expand Down
Loading
Loading