From 074c0cadf4cb3e3700031feebc1654f24d094567 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 23 Aug 2023 21:09:58 -0700 Subject: [PATCH 1/3] Make racey futures use an internal RNG This commit prevents these functions from needed to make calls into thread-local storage for every poll. Signed-off-by: John Nunley --- src/future.rs | 13 +++++++++++-- src/stream.rs | 13 +++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/future.rs b/src/future.rs index 2c05b63..6ef089b 100644 --- a/src/future.rs +++ b/src/future.rs @@ -32,6 +32,9 @@ use std::{ panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}, }; +#[cfg(feature = "std")] +use fastrand::Rng; + #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::task::{Context, Poll}; @@ -480,7 +483,11 @@ where F1: Future, F2: Future, { - Race { future1, future2 } + Race { + future1, + future2, + rng: Rng::new(), + } } #[cfg(feature = "std")] @@ -493,6 +500,7 @@ pin_project! { future1: F1, #[pin] future2: F2, + rng: Rng, } } @@ -507,7 +515,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if fastrand::bool() { + if this.rng.bool() { if let Poll::Ready(t) = this.future1.poll(cx) { return Poll::Ready(t); } @@ -644,6 +652,7 @@ pub trait FutureExt: Future { Race { future1: self, future2: other, + rng: Rng::new(), } } diff --git a/src/stream.rs b/src/stream.rs index 50f55dd..1b8667b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -31,6 +31,9 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; +#[cfg(feature = "std")] +use fastrand::Rng; + use pin_project_lite::pin_project; use crate::ready; @@ -1756,6 +1759,7 @@ pub trait StreamExt: Stream { Race { stream1: self, stream2: other, + rng: Rng::new(), } } @@ -2377,7 +2381,11 @@ where S1: Stream, S2: Stream, { - Race { stream1, stream2 } + Race { + stream1, + stream2, + rng: Rng::new(), + } } #[cfg(feature = "std")] @@ -2390,6 +2398,7 @@ pin_project! { stream1: S1, #[pin] stream2: S2, + rng: Rng, } } @@ -2404,7 +2413,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if fastrand::bool() { + if this.rng.bool() { if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) { return Poll::Ready(Some(t)); } From ee3d5806f0e92ac6f754f3c63608414e15db722a Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 23 Aug 2023 21:20:15 -0700 Subject: [PATCH 2/3] Add racey functions for using your own RNG seed The race functions were not available on no_std targets. This commit adds "with_seed" variants that take a u64 seed that is passed to fastrand. It can be used on no_std targets to provide random racey futures and streams. A "race" feature is added that enables it. This way fastrand doesn't become a dependency for crates that don't need it, like async-io. Signed-off-by: John Nunley --- Cargo.toml | 7 ++++--- src/future.rs | 47 ++++++++++++++++++++++++++++++++++++++++++----- src/stream.rs | 42 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 83 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a099fb6..efdec4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,13 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [features] -default = ["std"] -std = ["alloc", "fastrand", "futures-io", "parking", "memchr", "waker-fn"] +default = ["race", "std"] +std = ["alloc", "fastrand/std", "futures-io", "parking", "memchr", "waker-fn"] alloc = [] +race = ["fastrand"] [dependencies] -fastrand = { version = "2.0.0", optional = true } +fastrand = { version = "2.0.0", optional = true, default-features = false } futures-core = { version = "0.3.5", default-features = false } futures-io = { version = "0.3.5", optional = true } memchr = { version = "2.3.3", optional = true } diff --git a/src/future.rs b/src/future.rs index 6ef089b..2948185 100644 --- a/src/future.rs +++ b/src/future.rs @@ -32,7 +32,7 @@ use std::{ panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}, }; -#[cfg(feature = "std")] +#[cfg(feature = "race")] use fastrand::Rng; #[cfg(feature = "alloc")] @@ -477,7 +477,7 @@ where /// let res = future::race(ready(1), ready(2)).await; /// # }) /// ``` -#[cfg(feature = "std")] +#[cfg(all(feature = "race", feature = "std"))] pub fn race(future1: F1, future2: F2) -> Race where F1: Future, @@ -490,7 +490,44 @@ where } } -#[cfg(feature = "std")] +/// Race two futures but with a predefined random seed. +/// +/// This function is identical to [`race`], but instead of using a random seed from a thread-local +/// RNG, it allows the user to provide a seed. It is useful for when you already have a source of +/// randomness available, or if you want to use a fixed seed. +/// +/// See documentation of the [`race`] function for features and caveats. +/// +/// # Examples +/// +/// ``` +/// use futures_lite::future::{self, pending, ready}; +/// +/// // A fixed seed is used, so the result is deterministic. +/// const SEED: u64 = 0x42; +/// +/// # spin_on::spin_on(async { +/// assert_eq!(future::race_with_seed(ready(1), pending(), SEED).await, 1); +/// assert_eq!(future::race_with_seed(pending(), ready(2), SEED).await, 2); +/// +/// // One of the two futures is randomly chosen as the winner. +/// let res = future::race_with_seed(ready(1), ready(2), SEED).await; +/// # }) +/// ``` +#[cfg(feature = "race")] +pub fn race_with_seed(future1: F1, future2: F2, seed: u64) -> Race +where + F1: Future, + F2: Future, +{ + Race { + future1, + future2, + rng: Rng::with_seed(seed), + } +} + +#[cfg(feature = "race")] pin_project! { /// Future for the [`race()`] function and the [`FutureExt::race()`] method. #[derive(Debug)] @@ -504,7 +541,7 @@ pin_project! { } } -#[cfg(feature = "std")] +#[cfg(feature = "race")] impl Future for Race where F1: Future, @@ -643,7 +680,7 @@ pub trait FutureExt: Future { /// let res = ready(1).race(ready(2)).await; /// # }) /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", feature = "race"))] fn race(self, other: F) -> Race where Self: Sized, diff --git a/src/stream.rs b/src/stream.rs index 1b8667b..414988b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -31,7 +31,7 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -#[cfg(feature = "std")] +#[cfg(feature = "race")] use fastrand::Rng; use pin_project_lite::pin_project; @@ -1750,7 +1750,7 @@ pub trait StreamExt: Stream { /// let res = once(1).race(once(2)).next().await; /// # }) /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", feature = "race"))] fn race(self, other: S) -> Race where Self: Sized, @@ -2375,7 +2375,8 @@ where /// // One of the two stream is randomly chosen as the winner. /// let res = stream::race(once(1), once(2)).next().await; /// # }) -#[cfg(feature = "std")] +/// ``` +#[cfg(all(feature = "std", feature = "race"))] pub fn race(stream1: S1, stream2: S2) -> Race where S1: Stream, @@ -2388,7 +2389,38 @@ where } } -#[cfg(feature = "std")] +/// Races two streams, but with a user-provided seed for randomness. +/// +/// # Examples +/// +/// ``` +/// use futures_lite::stream::{self, once, pending, StreamExt}; +/// +/// // A fixed seed is used for reproducibility. +/// const SEED: u64 = 123; +/// +/// # spin_on::spin_on(async { +/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1)); +/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2)); +/// +/// // One of the two stream is randomly chosen as the winner. +/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await; +/// # }) +/// ``` +#[cfg(feature = "race")] +pub fn race_with_seed(stream1: S1, stream2: S2, seed: u64) -> Race +where + S1: Stream, + S2: Stream, +{ + Race { + stream1, + stream2, + rng: Rng::with_seed(seed), + } +} + +#[cfg(feature = "race")] pin_project! { /// Stream for the [`race()`] function and the [`StreamExt::race()`] method. #[derive(Clone, Debug)] @@ -2402,7 +2434,7 @@ pin_project! { } } -#[cfg(feature = "std")] +#[cfg(feature = "race")] impl Stream for Race where S1: Stream, From b85ecf540f65a853f74c21a396349ca99fd42344 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 2 Sep 2023 14:15:27 -0700 Subject: [PATCH 3/3] Bump MSRV to 1.61 This prevents memchr compile errors for now. Signed-off-by: John Nunley --- .github/workflows/ci.yml | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ccfc80..0e77772 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,7 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.48'] + rust: ['1.61'] steps: - uses: actions/checkout@v3 - name: Install Rust diff --git a/Cargo.toml b/Cargo.toml index efdec4b..0d2b059 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ authors = [ "Contributors to futures-rs", ] edition = "2018" -rust-version = "1.48" +rust-version = "1.61" description = "Futures, streams, and async I/O combinators" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/futures-lite"