Skip to content

Commit

Permalink
Replace async-channel with flume
Browse files Browse the repository at this point in the history
This commit replaces async-channel in some tests with flume. It appears
that async-channel doesn't work under MIRI but flume does, so we can
work around this for now by replacing it with flume.

cc smol-rs/async-channel#85

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull authored Apr 8, 2024
1 parent df82e5b commit 5428311
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ default = ["std"]
std = ["event-listener/std", "event-listener-strategy/std"]

[dev-dependencies]
async-channel = "2.2.0"
fastrand = "2.0.0"
flume = "0.11.0"
futures-lite = "2.0.0"
waker-fn = "1.1.0"

Expand Down
2 changes: 2 additions & 0 deletions src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl Barrier {
/// println!("after wait");
/// });
/// }
/// # // Wait for threads to stop.
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_blocking(&self) -> BarrierWaitResult {
Expand Down
13 changes: 7 additions & 6 deletions tests/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use async_lock::Barrier;
use futures_lite::future;

#[test]
#[cfg_attr(miri, ignore)]
fn smoke() {
future::block_on(async move {
const N: usize = 10;

let barrier = Arc::new(Barrier::new(N));

for _ in 0..10 {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();

for _ in 0..N - 1 {
let c = barrier.clone();
Expand All @@ -21,7 +22,7 @@ fn smoke() {
thread::spawn(move || {
future::block_on(async move {
let res = c.wait().await;
tx.send(res.is_leader()).await.unwrap();
tx.send_async(res.is_leader()).await.unwrap();
})
});
}
Expand All @@ -35,7 +36,7 @@ fn smoke() {

// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.recv().await.unwrap() {
if rx.recv_async().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}
Expand All @@ -55,15 +56,15 @@ fn smoke_blocking() {
let barrier = Arc::new(Barrier::new(N));

for _ in 0..10 {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();

for _ in 0..N - 1 {
let c = barrier.clone();
let tx = tx.clone();

thread::spawn(move || {
let res = c.wait_blocking();
tx.send_blocking(res.is_leader()).unwrap();
tx.send(res.is_leader()).unwrap();
});
}

Expand All @@ -76,7 +77,7 @@ fn smoke_blocking() {

// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.recv().await.unwrap() {
if rx.recv_async().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}
Expand Down
6 changes: 3 additions & 3 deletions tests/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn get_mut() {
#[test]
fn contention() {
future::block_on(async {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();

let tx = Arc::new(tx);
let mutex = Arc::new(Mutex::new(0i32));
Expand All @@ -77,14 +77,14 @@ fn contention() {
future::block_on(async move {
let mut lock = mutex.lock().await;
*lock += 1;
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
drop(lock);
})
});
}

for _ in 0..num_tasks {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}

let lock = mutex.lock().await;
Expand Down
37 changes: 19 additions & 18 deletions tests/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg(not(target_family = "wasm"))]
fn spawn<T: Send + 'static>(f: impl Future<Output = T> + Send + 'static) -> future::Boxed<T> {
let (s, r) = async_channel::bounded(1);
let (s, r) = flume::bounded(1);
thread::spawn(move || {
future::block_on(async {
let _ = s.send(f.await).await;
let _ = s.send_async(f.await).await;
})
});
async move { r.recv().await.unwrap() }.boxed()
async move { r.recv_async().await.unwrap() }.boxed()
}

#[test]
Expand Down Expand Up @@ -119,13 +119,14 @@ fn get_mut() {
}

// Miri bug; this works when async is replaced with blocking
#[cfg(not(any(target_family = "wasm", miri)))]
#[cfg(not(target_family = "wasm"))]
#[test]
#[cfg_attr(miri, ignore)]
fn contention() {
const N: u32 = 10;
const M: usize = if cfg!(miri) { 100 } else { 1000 };

let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));

Expand All @@ -142,25 +143,25 @@ fn contention() {
drop(rw.read().await);
}
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
});
}

future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}
});
}

// Miri bug; this works when async is replaced with blocking
#[cfg(not(any(target_family = "wasm", miri)))]
#[cfg(not(target_family = "wasm"))]
#[test]
#[cfg_attr(miri, ignore)]
fn contention_arc() {
const N: u32 = 10;
const M: usize = if cfg!(miri) { 100 } else { 1000 };

let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));

Expand All @@ -177,13 +178,13 @@ fn contention_arc() {
drop(rw.read_arc().await);
}
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
});
}

future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}
});
}
Expand All @@ -192,7 +193,7 @@ fn contention_arc() {
#[test]
fn writer_and_readers() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();

// Spawn a writer task.
let _spawned = spawn({
Expand All @@ -205,7 +206,7 @@ fn writer_and_readers() {
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
}
});

Expand All @@ -228,7 +229,7 @@ fn writer_and_readers() {
}

// Wait for writer to finish.
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
let lock = lock.read().await;
assert_eq!(*lock, 1000);
});
Expand All @@ -238,7 +239,7 @@ fn writer_and_readers() {
#[test]
fn writer_and_readers_arc() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();

// Spawn a writer task.
let _spawned = spawn({
Expand All @@ -251,7 +252,7 @@ fn writer_and_readers_arc() {
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
}
});

Expand All @@ -274,7 +275,7 @@ fn writer_and_readers_arc() {
}

// Wait for writer to finish.
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
let lock = lock.read_arc().await;
assert_eq!(*lock, 1000);
});
Expand Down

0 comments on commit 5428311

Please sign in to comment.