From 8bda189eacb22714eb9184a302dfcd929d4d906a Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Sun, 22 Sep 2024 21:21:03 +1000 Subject: [PATCH] Initial `no_std` version of `bevy_tasks` --- crates/bevy_tasks/Cargo.toml | 28 +- crates/bevy_tasks/src/executor.rs | 837 ++++++++++++++++++ crates/bevy_tasks/src/global_task_pool.rs | 249 ++++++ crates/bevy_tasks/src/iter/mod.rs | 22 +- crates/bevy_tasks/src/lib.rs | 75 +- .../src/single_threaded_task_pool.rs | 13 +- crates/bevy_tasks/src/slice.rs | 10 +- crates/bevy_tasks/src/task.rs | 12 +- crates/bevy_tasks/src/task_pool.rs | 38 +- crates/bevy_tasks/src/thread_executor.rs | 4 +- 10 files changed, 1225 insertions(+), 63 deletions(-) create mode 100644 crates/bevy_tasks/src/executor.rs create mode 100644 crates/bevy_tasks/src/global_task_pool.rs diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 0e8831e59f9c95..b664066b54c1bd 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,14 +9,34 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi_threaded = ["dep:async-channel", "dep:concurrent-queue"] +default = ["std"] +multi_threaded = ["std", "dep:async-channel"] +async-io = ["std", "dep:async-io"] +std = [ + "futures-lite/std", + "async-task/std", + "slab/std", + "fastrand/std", + "spin/std", +] [dependencies] -futures-lite = "2.0.1" -async-executor = "1.11" +futures-lite = { version = "2.0.1", default-features = false, features = [ + "race", +] } +async-task = { version = "4.4.0", default-features = false } +slab = { version = "0.4.4", default-features = false } +fastrand = { version = "2.0.0", default-features = false } +spin = { version = "0.9.8", default-features = false, features = [ + "spin_mutex", + "rwlock", +] } +concurrent-queue = { version = "2.0.0" } + +bevy_utils = { path = "../bevy_utils", version = "0.15.0-dev", default-features = false } + async-channel = { version = "2.2.0", optional = true } async-io = { version = "2.0.0", optional = true } -concurrent-queue = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs new file mode 100644 index 00000000000000..2d6987966be12f --- /dev/null +++ b/crates/bevy_tasks/src/executor.rs @@ -0,0 +1,837 @@ +#![expect(unsafe_code, reason = "TBA")] + +use alloc::{sync::Arc, vec::Vec}; + +use core::{ + fmt, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, + sync::atomic::{AtomicBool, AtomicPtr, Ordering}, + task::{Poll, Waker}, +}; + +use async_task::{Builder, Runnable, Task}; +use concurrent_queue::ConcurrentQueue; +use futures_lite::{future, prelude::*}; +use slab::Slab; +use spin::{Mutex, RwLock}; + +use bevy_utils::OnDrop; + +/// An async executor. +/// Based on the reference executor provided by [async_executor]. +/// +/// [async_executor]: https://crates.io/crates/async-executor +pub struct Executor<'a> { + /// The executor state. + state: AtomicPtr, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData>, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for Executor<'_> {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for Executor<'_> {} + +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl fmt::Debug for Executor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "Executor", f) + } +} + +impl<'a> Executor<'a> { + /// Creates a new executor. + pub const fn new() -> Executor<'a> { + Executor { + state: AtomicPtr::new(core::ptr::null_mut()), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + pub fn is_empty(&self) -> bool { + self.state().active.lock().is_empty() + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { + let mut active = self.state().active.lock(); + + // SAFETY: `T` and the future are `Send`. + unsafe { self.spawn_inner(future, &mut active) } + } + + /// Spawn a future while holding the inner lock. + /// + /// # Safety + /// + /// If this is an `Executor`, `F` and `T` must be `Send`. + unsafe fn spawn_inner( + &self, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let state = self.state_as_arc(); + let future = async move { + let _guard = OnDrop::new(move || drop(state.active.lock().try_remove(index))); + future.await + }; + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + let builder = Builder::new(); + + #[cfg(feature = "std")] + let builder = builder.propagate_panic(true); + + builder.spawn_unchecked(|()| future, self.schedule()) + }; + entry.insert(runnable.waker()); + + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + pub fn try_tick(&self) -> bool { + self.state().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + pub async fn tick(&self) { + self.state().tick().await; + } + + /// Runs the executor until the given future completes. + pub async fn run(&self, future: impl Future) -> T { + self.state().run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state_as_arc(); + + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } + + /// Returns a pointer to the inner state. + #[inline] + fn state_ptr(&self) -> *const State { + let mut ptr = self.state.load(Ordering::Acquire); + if ptr.is_null() { + ptr = State::alloc_atomic(&self.state); + } + ptr + } + + /// Returns a reference to the inner state. + #[inline] + fn state(&self) -> &State { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. + unsafe { &*self.state_ptr() } + } + + // Clones the inner state Arc + #[inline] + fn state_as_arc(&self) -> Arc { + // SAFETY: So long as an Executor lives, it's state pointer will always be a valid + // Arc when accessed through state_ptr. + let arc = unsafe { Arc::from_raw(self.state_ptr()) }; + let clone = arc.clone(); + core::mem::forget(arc); + clone + } +} + +impl Drop for Executor<'_> { + fn drop(&mut self) { + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Arc::new and converted + // via Arc::into_raw in state_ptr. + let state = unsafe { Arc::from_raw(ptr) }; + + let mut active = state.active.lock(); + for w in active.drain() { + w.wake(); + } + drop(active); + + while state.queue.pop().is_ok() {} + } +} + +impl<'a> Default for Executor<'a> { + fn default() -> Executor<'a> { + Executor::new() + } +} + +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +pub struct LocalExecutor<'a> { + /// The inner executor. + inner: Executor<'a>, + + /// Makes the type `!Send` and `!Sync`. + _marker: PhantomData>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(&self.inner, "LocalExecutor", f) + } +} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + pub const fn new() -> LocalExecutor<'a> { + LocalExecutor { + inner: Executor::new(), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + pub fn is_empty(&self) -> bool { + self.inner().is_empty() + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + 'a) -> Task { + let mut active = self.inner().state().active.lock(); + + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// [`spawn`]: LocalExecutor::spawn + /// [`Executor::spawn_many`]: Executor::spawn_many + pub fn spawn_many + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let mut active = self.inner().state().active.lock(); + + // Convert all of the futures to tasks. + let tasks = futures.into_iter().map(|future| { + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + + // As only one thread can spawn or poll tasks at a time, there is no need + // to release lock contention here. + }); + + // Push them to the user's collection. + handles.extend(tasks); + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + pub fn try_tick(&self) -> bool { + self.inner().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + pub async fn tick(&self) { + self.inner().tick().await; + } + + /// Runs the executor until the given future completes. + pub async fn run(&self, future: impl Future) -> T { + self.inner().run(future).await + } + + /// Returns a reference to the inner executor. + fn inner(&self) -> &Executor<'a> { + &self.inner + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} + +/// The state of a executor. +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + local_queues: RwLock>>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, + + /// Currently active tasks. + active: Mutex>, +} + +impl State { + /// Creates state for a new executor. + const fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + local_queues: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: Mutex::new(Slab::new()), + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let waker = self.sleepers.lock().notify(); + if let Some(w) = waker { + w.wake(); + } + } + } + + fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::with_seed(0x4d595df4d0f33173); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } + + #[cold] + fn alloc_atomic(atomic_ptr: &AtomicPtr) -> *mut Self { + let state = Arc::new(Self::new()); + let ptr = Arc::into_raw(state).cast_mut(); + if let Err(actual) = atomic_ptr.compare_exchange( + core::ptr::null_mut(), + ptr, + Ordering::AcqRel, + Ordering::Acquire, + ) { + // SAFETY: This was just created from Arc::into_raw. + drop(unsafe { Arc::from_raw(ptr) }); + actual + } else { + ptr + } + } +} + +/// A list of sleeping tickers. +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(usize, Waker)>, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = match self.free_ids.pop() { + Some(id) => id, + None => self.count + 1, + }; + self.count += 1; + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + item.1.clone_from(waker); + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.lock(); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock(); + sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self) -> Runnable { + self.runnable_with(|| self.state.queue.pop().ok()).await + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { + future::poll_fn(|cx| { + loop { + match search() { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + return Poll::Ready(r); + } + } + } + }) + .await + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock(); + let notified = sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +struct Runner<'a> { + /// The executor state. + state: &'a State, + + /// Inner ticker. + ticker: Ticker<'a>, + + /// The local queue. + local: Arc>, + + /// Bumped every time a runnable task is found. + ticks: usize, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let runner = Runner { + state, + ticker: Ticker::new(state), + local: Arc::new(ConcurrentQueue::bounded(512)), + ticks: 0, + }; + state.local_queues.write().push(runner.local.clone()); + runner + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { + let runnable = self + .ticker + .runnable_with(|| { + // Try the local queue. + if let Ok(r) = self.local.pop() { + return Some(r); + } + + // Try stealing from the global queue. + if let Ok(r) = self.state.queue.pop() { + steal(&self.state.queue, &self.local); + return Some(r); + } + + // Try stealing from other runners. + let local_queues = self.state.local_queues.read(); + + // Pick a random starting point in the iterator list and rotate the list. + let n = local_queues.len(); + let start = rng.usize(..n); + let iter = local_queues + .iter() + .chain(local_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(local, &self.local); + if let Ok(r) = self.local.pop() { + return Some(r); + } + } + + None + }) + .await; + + // Bump the tick counter. + self.ticks = self.ticks.wrapping_add(1); + + if self.ticks % 64 == 0 { + // Steal tasks from the global queue to ensure fair task scheduling. + steal(&self.state.queue, &self.local); + } + + runnable + } +} + +impl Drop for Runner<'_> { + fn drop(&mut self) { + // Remove the local queue. + self.state + .local_queues + .write() + .retain(|local| !Arc::ptr_eq(local, &self.local)); + + // Re-schedule remaining tasks in the local queue. + while let Ok(r) = self.local.pop() { + r.schedule(); + } + } +} + +/// Steals some items from one queue into another. +fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { + // Half of `src`'s length rounded up. + let mut count = (src.len() + 1) / 2; + + if count > 0 { + // Don't steal more than fits into the queue. + if let Some(cap) = dest.capacity() { + count = count.min(cap - dest.len()); + } + + // Steal tasks. + for _ in 0..count { + if let Ok(t) = src.pop() { + assert!(dest.push(t).is_ok()); + } else { + break; + } + } + } +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.load(Ordering::Acquire); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Arc::new and converted via Arc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a Mutex>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Some(lock) => fmt::Debug::fmt(&lock.len(), f), + None => f.write_str(""), + } + } + } + + /// Debug wrapper for the local runners. + struct LocalRunners<'a>(&'a RwLock>>>); + + impl fmt::Debug for LocalRunners<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_read() { + Some(lock) => f + .debug_list() + .entries(lock.iter().map(|queue| queue.len())) + .finish(), + None => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a Mutex); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Some(lock) => fmt::Debug::fmt(&lock.count, f), + None => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + .field("global_tasks", &state.queue.len()) + .field("local_runners", &LocalRunners(&state.local_queues)) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ensure_send_and_sync() { + use futures_lite::future::pending; + + fn is_send(_: T) {} + fn is_sync(_: T) {} + fn is_static(_: T) {} + + is_send::>(Executor::new()); + is_sync::>(Executor::new()); + + let ex = Executor::new(); + is_send(ex.run(pending::<()>())); + is_sync(ex.run(pending::<()>())); + is_send(ex.tick()); + is_sync(ex.tick()); + is_send(ex.schedule()); + is_sync(ex.schedule()); + is_static(ex.schedule()); + } +} diff --git a/crates/bevy_tasks/src/global_task_pool.rs b/crates/bevy_tasks/src/global_task_pool.rs new file mode 100644 index 00000000000000..6b6da1236961c5 --- /dev/null +++ b/crates/bevy_tasks/src/global_task_pool.rs @@ -0,0 +1,249 @@ +use alloc::sync::Arc; +use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; + +use spin::RwLock; + +use crate::{executor::Executor, Task}; + +static GLOBAL_EXECUTOR: Executor<'static> = Executor::new(); + +/// Used to create a [`TaskPool`]. +#[derive(Debug, Default, Clone)] +pub struct TaskPoolBuilder {} + +/// This is a dummy struct for wasm support to provide the same api as with the multithreaded +/// task pool. In the case of the multithreaded task pool this struct is used to spawn +/// tasks on a specific thread. But the wasm task pool just calls +/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread +/// and so the [`ThreadExecutor`] does nothing. +#[derive(Default)] +pub struct ThreadExecutor<'a>(PhantomData<&'a ()>); +impl<'a> ThreadExecutor<'a> { + /// Creates a new `ThreadExecutor` + pub fn new() -> Self { + Self::default() + } +} + +impl TaskPoolBuilder { + /// Creates a new `TaskPoolBuilder` instance + pub fn new() -> Self { + Self::default() + } + + /// No op on the single threaded task pool + pub fn num_threads(self, _num_threads: usize) -> Self { + self + } + + /// No op on the single threaded task pool + pub fn stack_size(self, _stack_size: usize) -> Self { + self + } + + /// No op on the single threaded task pool + pub fn thread_name(self, _thread_name: String) -> Self { + self + } + + /// Creates a new [`TaskPool`] + pub fn build(self) -> TaskPool { + TaskPool::new_internal() + } +} + +/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by +/// the pool on threads owned by the pool. In this case - main thread only. +#[derive(Debug, Default, Clone)] +pub struct TaskPool {} + +impl TaskPool { + /// Just create a new `ThreadExecutor` for wasm + pub fn get_thread_executor() -> Arc> { + Arc::new(ThreadExecutor::new()) + } + + /// Create a `TaskPool` with the default configuration. + pub fn new() -> Self { + TaskPoolBuilder::new().build() + } + + fn new_internal() -> Self { + Self {} + } + + /// Return the number of threads owned by the task pool + pub fn thread_num(&self) -> usize { + 1 + } + + /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope<'env, F, T>(&self, f: F) -> Vec + where + F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), + T: Send + 'static, + { + self.scope_with_executor(false, None, f) + } + + /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + #[expect(unsafe_code, reason = "Required to transmute lifetimes.")] + pub fn scope_with_executor<'env, F, T>( + &self, + _tick_task_pool_executor: bool, + _thread_executor: Option<&ThreadExecutor>, + f: F, + ) -> Vec + where + F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), + T: Send + 'static, + { + // SAFETY: This safety comment applies to all references transmuted to 'env. + // Any futures spawned with these references need to return before this function completes. + // This is guaranteed because we drive all the futures spawned onto the Scope + // to completion in this function. However, rust has no way of knowing this so we + // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. + // Any usages of the references passed into `Scope` must be accessed through + // the transmuted reference for the rest of this function. + + let executor = &Executor::new(); + // SAFETY: As above, all futures must complete in this function so we can change the lifetime + let executor: &'env Executor<'env> = unsafe { mem::transmute(executor) }; + + let results: RefCell>>>> = RefCell::new(Vec::new()); + // SAFETY: As above, all futures must complete in this function so we can change the lifetime + let results: &'env RefCell>>>> = + unsafe { mem::transmute(&results) }; + + let mut scope = Scope { + executor, + results, + scope: PhantomData, + env: PhantomData, + }; + + // SAFETY: As above, all futures must complete in this function so we can change the lifetime + let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) }; + + f(scope_ref); + + // Loop until all tasks are done + while executor.try_tick() {} + + let results = scope.results.borrow(); + results + .iter() + .map(|result| result.write().take().unwrap()) + .collect() + } + + /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled + /// to retrieve the output of the original future. Dropping the task will attempt to cancel it. + /// It can also be "detached", allowing it to continue running without having to be polled by the + /// end-user. + /// + /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. + pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + #[cfg(target_arch = "wasm32")] + return Task::wrap_future(future); + + #[cfg(not(target_arch = "wasm32"))] + { + let task = GLOBAL_EXECUTOR.spawn(future); + // Loop until all tasks are done + while GLOBAL_EXECUTOR.try_tick() {} + + Task::new(task) + } + } + + /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. + pub fn spawn_local(&self, future: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + self.spawn(future) + } + + /// Runs a function with the local executor. Typically used to tick + /// the local executor on the main thread as it needs to share time with + /// other things. + /// + /// ``` + /// use bevy_tasks::TaskPool; + /// + /// TaskPool::new().with_local_executor(|local_executor| { + /// local_executor.try_tick(); + /// }); + /// ``` + pub fn with_local_executor(&self, f: F) -> R + where + F: FnOnce(&Executor) -> R, + { + (f)(&GLOBAL_EXECUTOR) + } +} + +/// A `TaskPool` scope for running one or more non-`'static` futures. +/// +/// For more information, see [`TaskPool::scope`]. +#[derive(Debug)] +pub struct Scope<'scope, 'env: 'scope, T> { + executor: &'scope Executor<'scope>, + // Vector to gather results of all futures spawned during scope run + results: &'env RefCell>>>>, + + // make `Scope` invariant over 'scope and 'env + scope: PhantomData<&'scope mut &'scope ()>, + env: PhantomData<&'env mut &'env ()>, +} + +impl<'scope, 'env, T: Send + Sync + 'env> Scope<'scope, 'env, T> { + /// Spawns a scoped future onto the executor. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. + /// + /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn + Send + 'scope>(&self, f: Fut) { + self.spawn_on_scope(f); + } + + /// Spawns a scoped future onto the executor. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. + /// + /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_on_external + Send + 'scope>(&self, f: Fut) { + self.spawn_on_scope(f); + } + + /// Spawns a scoped future that runs on the thread the scope called from. The + /// scope *must* outlive the provided future. The results of the future will be + /// returned as a part of [`TaskPool::scope`]'s return value. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_on_scope + Send + 'scope>(&self, f: Fut) { + let result = Arc::new(RwLock::new(None)); + self.results.borrow_mut().push(result.clone()); + let f = async move { + let temp_result = f.await; + result.write().replace(temp_result); + }; + self.executor.spawn(f).detach(); + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 3910166904856a..39ab1b72a2677d 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,3 +1,5 @@ +use alloc::vec::Vec; + use crate::TaskPool; mod adapters; @@ -193,7 +195,7 @@ where fn collect(mut self, pool: &TaskPool) -> C where C: FromIterator, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -213,7 +215,7 @@ where where C: Default + Extend + Send, F: FnMut(&BatchIter::Item) -> bool + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { let (mut a, mut b) = <(C, C)>::default(); pool.scope(|s| { @@ -330,7 +332,7 @@ where /// See [`Iterator::max()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max) fn max(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + Send + 'static, + BatchIter::Item: Ord + crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -347,7 +349,7 @@ where /// See [`Iterator::min()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min) fn min(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + Send + 'static, + BatchIter::Item: Ord + crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -366,7 +368,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -386,7 +388,7 @@ where fn max_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -406,7 +408,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -426,7 +428,7 @@ where fn min_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::TaskBounds, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -479,7 +481,7 @@ where /// See [`Iterator::sum()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.sum) fn sum(mut self, pool: &TaskPool) -> R where - S: core::iter::Sum + Send + 'static, + S: core::iter::Sum + crate::TaskBounds, R: core::iter::Sum, { pool.scope(|s| { @@ -496,7 +498,7 @@ where /// See [`Iterator::product()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.product) fn product(mut self, pool: &TaskPool) -> R where - S: core::iter::Product + Send + 'static, + S: core::iter::Product + crate::TaskBounds, R: core::iter::Product, { pool.scope(|s| { diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1d6d35664ed0e9..1895b399a58d71 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -7,7 +7,16 @@ extern crate alloc; +mod executor; + +#[cfg(not(feature = "std"))] +mod global_task_pool; + +#[cfg(not(feature = "std"))] +pub use global_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; + mod slice; + pub use slice::{ParallelSlice, ParallelSliceMut}; #[cfg_attr(target_arch = "wasm32", path = "wasm_task.rs")] @@ -15,35 +24,63 @@ mod task; pub use task::Task; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] mod task_pool; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; -#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] +#[cfg(all( + feature = "std", + any(target_arch = "wasm32", not(feature = "multi_threaded")) +))] mod single_threaded_task_pool; -#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] +#[cfg(all( + feature = "std", + any(target_arch = "wasm32", not(feature = "multi_threaded")) +))] pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; + #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; + pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] mod thread_executor; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] + +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; #[cfg(feature = "async-io")] pub use async_io::block_on; -#[cfg(not(feature = "async-io"))] + +#[cfg(all(feature = "std", not(feature = "async-io")))] pub use futures_lite::future::block_on; + pub use futures_lite::future::poll_once; mod iter; + pub use iter::ParallelIterator; pub use futures_lite; @@ -52,25 +89,41 @@ pub use futures_lite; /// /// This includes the most common types in this crate, re-exported for your convenience. pub mod prelude { + #[cfg(feature = "std")] + #[doc(hidden)] + pub use crate::block_on; + #[doc(hidden)] pub use crate::{ - block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, }; } -use core::num::NonZero; - /// Gets the logical CPU core count available to the current process. /// /// This is identical to [`std::thread::available_parallelism`], except /// it will return a default value of 1 if it internally errors out. /// /// This will always return at least 1. +#[cfg(feature = "std")] pub fn available_parallelism() -> usize { std::thread::available_parallelism() - .map(NonZero::::get) + .map(core::num::NonZero::::get) .unwrap_or(1) } + +/// Marker trait indicating [`Task`] requirements. +#[cfg(feature = "std")] +pub trait TaskBounds: Send + 'static {} + +#[cfg(feature = "std")] +impl TaskBounds for T {} + +/// Marker trait indicating [`Task`] requirements. +#[cfg(not(feature = "std"))] +pub trait TaskBounds: Send + Sync + 'static {} + +#[cfg(not(feature = "std"))] +impl TaskBounds for T {} diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index d7f994026a6785..bd5346fea73abd 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,10 +1,10 @@ use alloc::{rc::Rc, sync::Arc}; use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; -use crate::Task; +use crate::{executor::LocalExecutor, Task}; thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; } /// Used to create a [`TaskPool`]. @@ -114,10 +114,9 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = &async_executor::LocalExecutor::new(); + let executor = &LocalExecutor::new(); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::LocalExecutor<'env> = - unsafe { mem::transmute(executor) }; + let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) }; let results: RefCell>>>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime @@ -192,7 +191,7 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&LocalExecutor) -> R, { LOCAL_EXECUTOR.with(f) } @@ -203,7 +202,7 @@ impl TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::LocalExecutor<'scope>, + executor: &'scope LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run results: &'env RefCell>>>>, diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index a8a87c9ce80a01..712126ffe254fb 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,3 +1,5 @@ +use alloc::vec::Vec; + use super::TaskPool; /// Provides functions for mapping read-only slices across a provided [`TaskPool`]. @@ -36,7 +38,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::TaskBounds, { let slice = self.as_ref(); let f = &f; @@ -83,7 +85,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::TaskBounds, { let slice = self.as_ref(); let chunk_size = core::cmp::max( @@ -139,7 +141,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::TaskBounds, { let slice = self.as_mut(); let f = &f; @@ -194,7 +196,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { ) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::TaskBounds, { let mut slice = self.as_mut(); let chunk_size = core::cmp::max( diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index 53292c7574f445..e1c252bea13889 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -4,7 +4,7 @@ use core::{ task::{Context, Poll}, }; -/// Wraps `async_executor::Task`, a spawned future. +/// Wraps `async_task::Task`, a spawned future. /// /// Tasks are also futures themselves and yield the output of the spawned future. /// @@ -14,16 +14,16 @@ use core::{ /// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. #[derive(Debug)] #[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."] -pub struct Task(async_executor::Task); +pub struct Task(async_task::Task); impl Task { - /// Creates a new task from a given `async_executor::Task` - pub fn new(task: async_executor::Task) -> Self { + /// Creates a new task from a given `async_task::Task` + pub fn new(task: async_task::Task) -> Self { Self(task) } /// Detaches the task to let it keep running in the background. See - /// `async_executor::Task::detach` + /// `async_task::Task::detach` pub fn detach(self) { self.0.detach(); } @@ -36,7 +36,7 @@ impl Task { /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of /// canceling because it also waits for the task to stop running. /// - /// See `async_executor::Task::cancel` + /// See `async_task::Task::cancel` pub async fn cancel(self) -> Option { self.0.cancel().await } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9fab3fbbe83173..b882ecc03038f5 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,25 +2,19 @@ use alloc::sync::Arc; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; use std::thread::{self, JoinHandle}; -use async_executor::FallibleTask; +use async_task::FallibleTask; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; +use crate::executor::{Executor, LocalExecutor}; + use crate::{ block_on, thread_executor::{ThreadExecutor, ThreadExecutorTicker}, Task, }; -struct CallOnDrop(Option>); - -impl Drop for CallOnDrop { - fn drop(&mut self) { - if let Some(call) = self.0.as_ref() { - call(); - } - } -} +use bevy_utils::OnDrop; /// Used to create a [`TaskPool`] #[derive(Default)] @@ -102,7 +96,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: Arc>, // The inner state of the pool. threads: Vec>, @@ -111,7 +105,7 @@ pub struct TaskPool { impl TaskPool { thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); } @@ -128,7 +122,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(async_executor::Executor::new()); + let executor = Arc::new(Executor::new()); let num_threads = builder .num_threads @@ -160,7 +154,11 @@ impl TaskPool { on_thread_spawn(); drop(on_thread_spawn); } - let _destructor = CallOnDrop(on_thread_destroy); + let _destructor = OnDrop::new(move || { + if let Some(f) = on_thread_destroy { + (f)(); + } + }); loop { let res = std::panic::catch_unwind(|| { let tick_forever = async move { @@ -344,9 +342,9 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &async_executor::Executor = &self.executor; + let executor: &Executor = &self.executor; // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; + let executor: &'env Executor = unsafe { mem::transmute(executor) }; // SAFETY: As above, all futures must complete in this function so we can change the lifetime let external_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(external_executor) }; @@ -432,7 +430,7 @@ impl TaskPool { #[inline] async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, external_ticker: ThreadExecutorTicker<'scope, 'ticker>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, @@ -478,7 +476,7 @@ impl TaskPool { #[inline] async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { @@ -562,7 +560,7 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&LocalExecutor) -> R, { Self::LOCAL_EXECUTOR.with(f) } @@ -593,7 +591,7 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, external_executor: &'scope ThreadExecutor<'scope>, scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>>>, diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs index b25811b5593413..c29940cbd5fa89 100644 --- a/crates/bevy_tasks/src/thread_executor.rs +++ b/crates/bevy_tasks/src/thread_executor.rs @@ -1,9 +1,11 @@ use core::marker::PhantomData; use std::thread::{self, ThreadId}; -use async_executor::{Executor, Task}; +use async_task::Task; use futures_lite::Future; +use crate::executor::Executor; + /// An executor that can only be ticked on the thread it was instantiated on. But /// can spawn `Send` tasks from other threads. ///