diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be05945..db2bd1a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,6 +45,7 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --all-features - run: cargo check --all --all-features --target wasm32-unknown-unknown - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps @@ -82,6 +83,10 @@ jobs: env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + - run: cargo miri test --all-features + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout security_audit: permissions: diff --git a/Cargo.toml b/Cargo.toml index 15d0cf5..16d33bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.11.0" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.60" +rust-version = "1.63" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" @@ -14,9 +14,13 @@ keywords = ["asynchronous", "executor", "single", "multi", "spawn"] categories = ["asynchronous", "concurrency"] exclude = ["/.*"] +[features] +# Adds support for executors optimized for use in static variables. +static = [] + [dependencies] async-task = "4.4.0" -concurrent-queue = "2.0.0" +concurrent-queue = "2.5.0" fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4" @@ -37,3 +41,4 @@ once_cell = "1.16.0" [[bench]] name = "executor" harness = false +required-features = ["static"] diff --git a/benches/executor.rs b/benches/executor.rs index f624513..74b2955 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,7 +1,7 @@ use std::mem; use std::thread::available_parallelism; -use async_executor::Executor; +use async_executor::{Executor, StaticExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; use futures_lite::{future, prelude::*}; @@ -10,6 +10,7 @@ const STEPS: usize = 300; const LIGHT_TASKS: usize = 25_000; static EX: Executor<'_> = Executor::new(); +static STATIC_EX: StaticExecutor = StaticExecutor::new(); fn run(f: impl FnOnce(), multithread: bool) { let limit = if multithread { @@ -27,6 +28,22 @@ fn run(f: impl FnOnce(), multithread: bool) { }); } +fn run_static(f: impl FnOnce(), multithread: bool) { + let limit = if multithread { + available_parallelism().unwrap().get() + } else { + 1 + }; + + let (s, r) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv()))) + .finish(move || { + let _s = s; + f() + }); +} + fn create(c: &mut Criterion) { c.bench_function("executor::create", |b| { b.iter(|| { @@ -38,224 +55,442 @@ fn create(c: &mut Criterion) { } fn running_benches(c: &mut Criterion) { - for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { - let mut group = c.benchmark_group(group_name.to_string()); - - group.bench_function("executor::spawn_one", |b| { - run( - || { - b.iter(|| { - future::block_on(async { EX.spawn(async {}).await }); - }); - }, - *multithread, - ); - }); - - group.bench_function("executor::spawn_batch", |b| { - run( - || { - let mut handles = vec![]; + for (prefix, with_static) in [("executor", false), ("static_executor", true)] { + for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { + let mut group = c.benchmark_group(group_name.to_string()); + + group.bench_function(format!("{}::spawn_one", prefix), |b| { + if with_static { + run_static( + || { + b.iter(|| { + future::block_on(async { STATIC_EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(|| { + future::block_on(async { EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } + }); - b.iter(|| { - EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); - }); + if !with_static { + group.bench_function("executor::spawn_batch", |b| { + run( + || { + let mut handles = vec![]; - handles.clear(); - }, - *multithread, - ) - }); + b.iter(|| { + EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + }); - group.bench_function("executor::spawn_many_local", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..LIGHT_TASKS { - tasks.push(EX.spawn(async {})); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + handles.clear(); + }, + *multithread, + ) + }); + } - group.bench_function("executor::spawn_recursively", |b| { - #[allow(clippy::manual_async_fn)] - fn go(i: usize) -> impl Future + Send + 'static { - async move { - if i != 0 { - EX.spawn(async move { - let fut = go(i - 1).boxed(); - fut.await; - }) - .await; + group.bench_function(format!("{}::spawn_many_local", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(STATIC_EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::spawn_recursively", prefix), |b| { + #[allow(clippy::manual_async_fn)] + fn go(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + EX.spawn(async move { + let fut = go(i - 1).boxed(); + fut.await; + }) + .await; + } } } - } - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(go(STEPS))); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + #[allow(clippy::manual_async_fn)] + fn go_static(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + STATIC_EX + .spawn(async move { + let fut = go_static(i - 1).boxed(); + fut.await; + }) + .await; + } + } + } - group.bench_function("executor::yield_now", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(async move { - for _ in 0..STEPS { - future::yield_now().await; + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(go_static(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(go(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::yield_now", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::channels", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(STATIC_EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); } - })); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); - group.bench_function("executor::channels", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - // Create channels. - let mut tasks = Vec::new(); - let (first_send, first_recv) = async_channel::bounded(1); - let mut current_recv = first_recv; - - for _ in 0..TASKS { - let (next_send, next_recv) = async_channel::bounded(1); - let current_recv = mem::replace(&mut current_recv, next_recv); - - tasks.push(EX.spawn(async move { - // Send a notification on to the next task. for _ in 0..STEPS { + first_send.send(()).await.unwrap(); current_recv.recv().await.unwrap(); - next_send.send(()).await.unwrap(); } - })); - } - - for _ in 0..STEPS { - first_send.send(()).await.unwrap(); - current_recv.recv().await.unwrap(); - } - - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ) - }); - group.bench_function("executor::web_server", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let (db_send, db_recv) = - async_channel::bounded::>(TASKS / 5); - let mut db_rng = fastrand::Rng::with_seed(0x12345678); - let mut web_rng = db_rng.fork(); - - // This task simulates a database. - let db_task = EX.spawn(async move { - loop { - // Wait for a new task. - let incoming = match db_recv.recv().await { - Ok(incoming) => incoming, - Err(_) => break, - }; - - // Process the task. Maybe it takes a while. - for _ in 0..db_rng.usize(..10) { - future::yield_now().await; + for task in tasks { + task.await; } - - // Send the data back. - incoming.send(db_rng.usize(..)).await.ok(); - } + }); }); + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } - // This task simulates a web server waiting for new tasks. - let server_task = EX.spawn(async move { - for i in 0..TASKS { - // Get a new connection. - if web_rng.usize(..=16) == 16 { - future::yield_now().await; + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); } - let mut web_rng = web_rng.fork(); - let db_send = db_send.clone(); - let task = EX.spawn(async move { - // Check if the data is cached... - if web_rng.bool() { - // ...it's in cache! - future::yield_now().await; - return; + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ) + } + }); + + group.bench_function(format!("{}::web_server", prefix), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = STATIC_EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); } + }); - // Otherwise we have to make a DB call or two. - for _ in 0..web_rng.usize(STEPS / 2..STEPS) { - let (resp_send, resp_recv) = async_channel::bounded(1); - db_send.send(resp_send).await.unwrap(); - criterion::black_box(resp_recv.recv().await.unwrap()); + // This task simulates a web server waiting for new tasks. + let server_task = STATIC_EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = STATIC_EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box( + resp_recv.recv().await.unwrap(), + ); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } } + }); - // Send the data back... - for _ in 0..web_rng.usize(3..16) { - future::yield_now().await; + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); } }); - task.detach(); - - if i & 16 == 0 { - future::yield_now().await; - } - } - }); + // This task simulates a web server waiting for new tasks. + let server_task = EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box( + resp_recv.recv().await.unwrap(), + ); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); - // Spawn and wait for it to stop. - server_task.await; - db_task.await; - }); - }) - }, - *multithread, - ) - }); + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } + }); + } } } diff --git a/src/lib.rs b/src/lib.rs index a663be8..7c5d49d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] use std::fmt; use std::marker::PhantomData; @@ -51,8 +52,13 @@ use concurrent_queue::ConcurrentQueue; use futures_lite::{future, prelude::*}; use slab::Slab; +#[cfg(feature = "static")] +mod static_executors; + #[doc(no_inline)] pub use async_task::{FallibleTask, Task}; +#[cfg(feature = "static")] +pub use static_executors::*; /// An async executor. /// @@ -292,18 +298,7 @@ impl<'a> Executor<'a> { /// assert!(ex.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { - match self.state().queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.state().notify(); - - // Run the task. - runnable.run(); - true - } - } + self.state().try_tick() } /// Runs a single task. @@ -326,9 +321,7 @@ impl<'a> Executor<'a> { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let state = self.state(); - let runnable = Ticker::new(state).runnable().await; - runnable.run(); + self.state().tick().await; } /// Runs the executor until the given future completes. @@ -347,22 +340,7 @@ impl<'a> Executor<'a> { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self.state()); - let mut rng = fastrand::Rng::new(); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + self.state().run(future).await } /// Returns a function that schedules a runnable task when it gets woken up. @@ -701,7 +679,7 @@ struct State { impl State { /// Creates state for a new executor. - fn new() -> State { + const fn new() -> State { State { queue: ConcurrentQueue::unbounded(), local_queues: RwLock::new(Vec::new()), @@ -729,6 +707,45 @@ impl State { } } } + + pub(crate) fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::new(); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } } /// A list of sleeping tickers. @@ -1068,6 +1085,11 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ // in state_ptr. let state = unsafe { &*ptr }; + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Debug wrapper for the number of active tasks. struct ActiveTasks<'a>(&'a Mutex>); diff --git a/src/static_executors.rs b/src/static_executors.rs new file mode 100644 index 0000000..c1724e9 --- /dev/null +++ b/src/static_executors.rs @@ -0,0 +1,479 @@ +use crate::{debug_state, Executor, LocalExecutor, State}; +use async_task::{Builder, Runnable, Task}; +use slab::Slab; +use std::{ + cell::UnsafeCell, + fmt, + future::Future, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, +}; + +impl Executor<'static> { + /// Consumes the [`Executor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticExecutor { + let ptr = self.state_ptr(); + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state: &'static State = unsafe { &*ptr }; + + std::mem::forget(self); + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) }; + static_executor + } +} + +impl LocalExecutor<'static> { + /// Consumes the [`LocalExecutor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticLocalExecutor { + let ptr = self.inner.state_ptr(); + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state: &'static State = unsafe { &*ptr }; + + std::mem::forget(self); + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) }; + static_executor + } +} + +/// A static-lifetimed async [`Executor`]. +/// +/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static +/// contexts via [`Executor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html +#[repr(transparent)] +pub struct StaticExecutor { + state: State, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for StaticExecutor {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for StaticExecutor {} + +impl UnwindSafe for StaticExecutor {} +impl RefUnwindSafe for StaticExecutor {} + +impl fmt::Debug for StaticExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticExecutor", f) + } +} + +impl StaticExecutor { + /// Creates a new StaticExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &'static self, + future: impl Future + Send + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'a>( + &'static self, + future: impl Future + Send + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is `Send` + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// assert!(!EXECUTOR.try_tick()); // no tasks to run + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// assert!(EXECUTOR.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(EXECUTOR.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { 1 + 2 }); + /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} + +impl Default for StaticExecutor { + fn default() -> Self { + Self::new() + } +} + +/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`]. +/// +/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static +/// contexts via [`LocalExecutor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html +#[repr(transparent)] +pub struct StaticLocalExecutor { + state: State, + marker_: PhantomData>, +} + +impl UnwindSafe for StaticLocalExecutor {} +impl RefUnwindSafe for StaticLocalExecutor {} + +impl fmt::Debug for StaticLocalExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticLocalExecutor", f) + } +} + +impl StaticLocalExecutor { + /// Creates a new StaticLocalExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticLocalExecutor; + /// + /// thread_local! { + /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new(); + /// } + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + marker_: PhantomData, + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&'static self, future: impl Future + 'static) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_local(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: 'a>( + &'static self, + future: impl Future + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} + +impl Default for StaticLocalExecutor { + fn default() -> Self { + Self::new() + } +}