diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ccfc80..0e77772 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,7 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.48'] + rust: ['1.61'] steps: - uses: actions/checkout@v3 - name: Install Rust diff --git a/Cargo.toml b/Cargo.toml index a099fb6..0d2b059 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ authors = [ "Contributors to futures-rs", ] edition = "2018" -rust-version = "1.48" +rust-version = "1.61" description = "Futures, streams, and async I/O combinators" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/futures-lite" @@ -20,12 +20,13 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [features] -default = ["std"] -std = ["alloc", "fastrand", "futures-io", "parking", "memchr", "waker-fn"] +default = ["race", "std"] +std = ["alloc", "fastrand/std", "futures-io", "parking", "memchr", "waker-fn"] alloc = [] +race = ["fastrand"] [dependencies] -fastrand = { version = "2.0.0", optional = true } +fastrand = { version = "2.0.0", optional = true, default-features = false } futures-core = { version = "0.3.5", default-features = false } futures-io = { version = "0.3.5", optional = true } memchr = { version = "2.3.3", optional = true } diff --git a/src/future.rs b/src/future.rs index 2c05b63..2948185 100644 --- a/src/future.rs +++ b/src/future.rs @@ -32,6 +32,9 @@ use std::{ panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}, }; +#[cfg(feature = "race")] +use fastrand::Rng; + #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::task::{Context, Poll}; @@ -474,16 +477,57 @@ where /// let res = future::race(ready(1), ready(2)).await; /// # }) /// ``` -#[cfg(feature = "std")] +#[cfg(all(feature = "race", feature = "std"))] pub fn race(future1: F1, future2: F2) -> Race where F1: Future, F2: Future, { - Race { future1, future2 } + Race { + future1, + future2, + rng: Rng::new(), + } } -#[cfg(feature = "std")] +/// Race two futures but with a predefined random seed. +/// +/// This function is identical to [`race`], but instead of using a random seed from a thread-local +/// RNG, it allows the user to provide a seed. It is useful for when you already have a source of +/// randomness available, or if you want to use a fixed seed. +/// +/// See documentation of the [`race`] function for features and caveats. +/// +/// # Examples +/// +/// ``` +/// use futures_lite::future::{self, pending, ready}; +/// +/// // A fixed seed is used, so the result is deterministic. +/// const SEED: u64 = 0x42; +/// +/// # spin_on::spin_on(async { +/// assert_eq!(future::race_with_seed(ready(1), pending(), SEED).await, 1); +/// assert_eq!(future::race_with_seed(pending(), ready(2), SEED).await, 2); +/// +/// // One of the two futures is randomly chosen as the winner. +/// let res = future::race_with_seed(ready(1), ready(2), SEED).await; +/// # }) +/// ``` +#[cfg(feature = "race")] +pub fn race_with_seed(future1: F1, future2: F2, seed: u64) -> Race +where + F1: Future, + F2: Future, +{ + Race { + future1, + future2, + rng: Rng::with_seed(seed), + } +} + +#[cfg(feature = "race")] pin_project! { /// Future for the [`race()`] function and the [`FutureExt::race()`] method. #[derive(Debug)] @@ -493,10 +537,11 @@ pin_project! { future1: F1, #[pin] future2: F2, + rng: Rng, } } -#[cfg(feature = "std")] +#[cfg(feature = "race")] impl Future for Race where F1: Future, @@ -507,7 +552,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if fastrand::bool() { + if this.rng.bool() { if let Poll::Ready(t) = this.future1.poll(cx) { return Poll::Ready(t); } @@ -635,7 +680,7 @@ pub trait FutureExt: Future { /// let res = ready(1).race(ready(2)).await; /// # }) /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", feature = "race"))] fn race(self, other: F) -> Race where Self: Sized, @@ -644,6 +689,7 @@ pub trait FutureExt: Future { Race { future1: self, future2: other, + rng: Rng::new(), } } diff --git a/src/stream.rs b/src/stream.rs index 50f55dd..414988b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -31,6 +31,9 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; +#[cfg(feature = "race")] +use fastrand::Rng; + use pin_project_lite::pin_project; use crate::ready; @@ -1747,7 +1750,7 @@ pub trait StreamExt: Stream { /// let res = once(1).race(once(2)).next().await; /// # }) /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", feature = "race"))] fn race(self, other: S) -> Race where Self: Sized, @@ -1756,6 +1759,7 @@ pub trait StreamExt: Stream { Race { stream1: self, stream2: other, + rng: Rng::new(), } } @@ -2371,16 +2375,52 @@ where /// // One of the two stream is randomly chosen as the winner. /// let res = stream::race(once(1), once(2)).next().await; /// # }) -#[cfg(feature = "std")] +/// ``` +#[cfg(all(feature = "std", feature = "race"))] pub fn race(stream1: S1, stream2: S2) -> Race where S1: Stream, S2: Stream, { - Race { stream1, stream2 } + Race { + stream1, + stream2, + rng: Rng::new(), + } } -#[cfg(feature = "std")] +/// Races two streams, but with a user-provided seed for randomness. +/// +/// # Examples +/// +/// ``` +/// use futures_lite::stream::{self, once, pending, StreamExt}; +/// +/// // A fixed seed is used for reproducibility. +/// const SEED: u64 = 123; +/// +/// # spin_on::spin_on(async { +/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1)); +/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2)); +/// +/// // One of the two stream is randomly chosen as the winner. +/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await; +/// # }) +/// ``` +#[cfg(feature = "race")] +pub fn race_with_seed(stream1: S1, stream2: S2, seed: u64) -> Race +where + S1: Stream, + S2: Stream, +{ + Race { + stream1, + stream2, + rng: Rng::with_seed(seed), + } +} + +#[cfg(feature = "race")] pin_project! { /// Stream for the [`race()`] function and the [`StreamExt::race()`] method. #[derive(Clone, Debug)] @@ -2390,10 +2430,11 @@ pin_project! { stream1: S1, #[pin] stream2: S2, + rng: Rng, } } -#[cfg(feature = "std")] +#[cfg(feature = "race")] impl Stream for Race where S1: Stream, @@ -2404,7 +2445,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if fastrand::bool() { + if this.rng.bool() { if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) { return Poll::Ready(Some(t)); }