From 13bc0bb5ee55d0d7809de8f4ffd99e2b3fdee2fb Mon Sep 17 00:00:00 2001 From: NthTensor Date: Mon, 18 Aug 2025 21:02:05 -0400 Subject: [PATCH] refactor: unify spawn functions --- Cargo.lock | 7 - Cargo.toml | 2 +- rayon-compat/Cargo.toml | 13 -- rayon-compat/README.md | 14 -- rayon-compat/src/lib.rs | 186 ------------------ src/latch.rs | 61 +++++- src/lib.rs | 4 +- src/scope.rs | 410 +++++++++++++++++++++++++++------------- src/thread_pool.rs | 358 +++++++++++++++-------------------- 9 files changed, 493 insertions(+), 562 deletions(-) delete mode 100644 rayon-compat/Cargo.toml delete mode 100644 rayon-compat/README.md delete mode 100644 rayon-compat/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 3efbd92..f11b1a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,13 +497,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "forte-rayon-compat" -version = "1.12.0-dev" -dependencies = [ - "forte", -] - [[package]] name = "funty" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index 583149d..ce86295 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/NthTensor/Forte" [workspace] resolver = "2" -members = ["ci", "rayon-compat"] +members = ["ci"] [dependencies] arraydeque = "0.5.1" diff --git a/rayon-compat/Cargo.toml b/rayon-compat/Cargo.toml deleted file mode 100644 index d2e4000..0000000 --- a/rayon-compat/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "forte-rayon-compat" -version = "1.12.0-dev" -edition = "2024" -license = "MIT OR Apache-2.0" -description = "A shim that allows replacing rayon-core with forte" -repository = "https://github.com/NthTensor/Forte" - -[dependencies] -forte = { version = "1.0.0-dev", path = ".." } - -[features] -web_spin_lock = [] diff --git a/rayon-compat/README.md b/rayon-compat/README.md deleted file mode 100644 index da70597..0000000 --- a/rayon-compat/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# Rayon Compat - -This is a way to run `rayon` on top of `forte`! The `rayon-compat` crate mocks the important bits of the api of `rayon_core` in a pretty simple and crude way, which is none-the-less enough to support most of what `rayon` needs. - -To use this crate, apply the following cargo patch like one of these: -``` -// If you want to clone forte and use it locally -[patch.crates-io] -rayon-core = { path = "path to this repo", package = "rayon-compat" } - -// If you want to use the latest published version of forte -[patch.crates-io] -rayon-core = { path = "https://github.com/NthTensor/Forte", package = "rayon-compat" } -``` diff --git a/rayon-compat/src/lib.rs b/rayon-compat/src/lib.rs deleted file mode 100644 index 995e518..0000000 --- a/rayon-compat/src/lib.rs +++ /dev/null @@ -1,186 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; - -pub static THREAD_POOL: forte::ThreadPool = const { forte::ThreadPool::new() }; - -pub static STARTED: AtomicBool = const { AtomicBool::new(false) }; - -#[inline(always)] -fn ensure_started() { - if !STARTED.load(Ordering::Relaxed) && !STARTED.swap(true, Ordering::Relaxed) { - THREAD_POOL.resize_to_available(); - } -} - -#[inline(always)] -pub fn current_num_threads() -> usize { - 64 // Forte prefers smaller tasks, so it's better to lie to rayon about the size of the pool -} - -#[inline(always)] -pub fn current_thread_index() -> Option { - forte::Worker::map_current(|worker| worker.index()) -} - -#[inline(always)] -pub fn max_num_threads() -> usize { - usize::MAX // The number of forte workers is only bounded by the size of a vector. -} - -// ----------------------------------------------------------------------------- -// Join - -#[derive(Debug)] -pub struct FnContext { - /// True if the task was migrated. - migrated: bool, -} - -impl FnContext { - #[inline(always)] - pub fn migrated(&self) -> bool { - self.migrated - } -} - -#[inline(always)] -pub fn join_context(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce(FnContext) -> RA + Send, - B: FnOnce(FnContext) -> RB + Send, - RA: Send, - RB: Send, -{ - ensure_started(); - THREAD_POOL.join( - |worker| { - let migrated = worker.migrated(); - let ctx = FnContext { migrated }; - oper_a(ctx) - }, - |worker| { - let migrated = worker.migrated(); - let ctx = FnContext { migrated }; - oper_b(ctx) - }, - ) -} - -#[inline(always)] -pub fn join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, -{ - ensure_started(); - THREAD_POOL.join(|_| oper_a(), |_| oper_b()) -} - -// ----------------------------------------------------------------------------- -// Scope - -pub struct Scope<'scope, 'env> { - inner_scope: &'scope forte::Scope<'scope, 'env>, -} - -impl<'scope, 'env> Scope<'scope, 'env> { - pub fn spawn(self, f: F) - where - F: FnOnce(Scope<'scope, 'env>) + Send + 'scope, - { - forte::Worker::with_current(|worker| { - let worker = worker.unwrap(); - let inner_scope = self.inner_scope; - inner_scope.spawn_on(worker, |_| f(Scope { inner_scope })) - }); - } -} - -#[inline(always)] -pub fn scope<'env, OP, R>(op: OP) -> R -where - OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R + Send, - R: Send, -{ - ensure_started(); - forte::scope(|inner_scope| op(Scope { inner_scope })) -} - -#[inline(always)] -pub fn in_place_scope<'env, OP, R>(op: OP) -> R -where - OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R, -{ - ensure_started(); - forte::scope(|inner_scope| op(Scope { inner_scope })) -} - -// ----------------------------------------------------------------------------- -// Spawn - -#[inline(always)] -pub fn spawn(func: F) -where - F: FnOnce() + Send + 'static, -{ - ensure_started(); - THREAD_POOL.spawn(|_| func()) -} - -// ----------------------------------------------------------------------------- -// Yield - -pub use forte::Yield; - -pub fn yield_local() -> Yield { - let result = forte::Worker::map_current(forte::Worker::yield_local); - match result { - Some(status) => status, - _ => Yield::Idle, - } -} - -pub fn yield_now() -> Yield { - let result = forte::Worker::map_current(forte::Worker::yield_now); - match result { - Some(status) => status, - _ => Yield::Idle, - } -} - -// ----------------------------------------------------------------------------- -// Fake stuff that dosn't work. These are here only so so that rayon can export -// them. - -pub struct ThreadBuilder; - -pub struct ThreadPool; - -pub struct ThreadPoolBuildError; - -pub struct ThreadPoolBuilder; - -pub struct BroadcastContext; - -pub struct ScopeFifo; - -pub fn broadcast() { - unimplemented!() -} - -pub fn spawn_broadcast() { - unimplemented!() -} - -pub fn scope_fifo() { - unimplemented!() -} - -pub fn in_place_scope_fifo() { - unimplemented!() -} - -pub fn spawn_fifo() { - unimplemented!() -} diff --git a/src/latch.rs b/src/latch.rs index 7a65c19..2fc545f 100644 --- a/src/latch.rs +++ b/src/latch.rs @@ -24,9 +24,9 @@ const ASLEEP: u32 = 0b10; // ----------------------------------------------------------------------------- // Latch -/// A [Latch] is a signaling mechanism used to indicate when an event has +/// A Latch is a signaling mechanism used to indicate when an event has /// occurred. The latch begins as *unset* (In the `LOCKED` state), and can later -/// be *set* by any thread (entering the *SIGNAL*) state. +/// be *set* by any thread (entering the `SIGNAL`) state. /// /// Each latch is associated with one *owner thread*. This is the thread that /// may be blocking, waiting for the latch to complete. @@ -62,10 +62,21 @@ impl Latch { /// Returns true if the latch signal was received, and false otherwise. #[inline(always)] pub fn wait(&self) -> bool { + // First, check if the latch has been set. + // + // In the event of a race with `set`: + // + If this happens before the store, then we will go to sleep. + // + If this happens after the store, then we notice and return. if self.state.load(Ordering::Relaxed) == SIGNAL { return true; } + // If it has not been set, go to sleep. + // + // In the event of a race with `set`, the `wake` will always cause this + // to return regardless of memory ordering. let slept = self.sleep_controller.sleep(); + // If we actually slept, check the status again to see if it has + // changed. Otherwise assume it hasn't. if slept { self.state.load(Ordering::Relaxed) == SIGNAL } else { @@ -86,8 +97,16 @@ impl Latch { pub unsafe fn set(latch: *const Latch) { // SAFETY: At this point, the latch must still be valid to dereference. let sleep_controller = unsafe { (*latch).sleep_controller }; + // First we set the state to true. + // + // In the event of a race with `wait`, this may cause `wait` to return. + // Otherwise the other thread will sleep within `wait. + // // SAFETY: At this point, the latch must still be valid to dereference. unsafe { (*latch).state.store(SIGNAL, Ordering::Relaxed) }; + // We must try to wake the other thread, just in case it missed the + // notification and went to sleep. This garentees that the other thread + // will make progress. sleep_controller.wake(); } @@ -120,21 +139,59 @@ impl Default for SleepController { } impl SleepController { + // Attempt to wake the thread to which this belongs. + // + // Returns true if this allows the thread to make progress (by waking it up + // or catching it before it goes to sleep) and false if the thread was + // running. pub fn wake(&self) -> bool { + // Set set the state to SIGNAL and read the current state, which must be + // either LOCKED or ASLEEP. let sleep_state = self.state.swap(SIGNAL, Ordering::Relaxed); let asleep = sleep_state == ASLEEP; if asleep { + // If the state was ASLEEP, the thread is either asleep or about to + // go to sleep. + // + // + If it is about to go to sleep (but has not yet called + // `atomic_wait::wait`) then setting the state to SIGNAL above + // should prevent it from going to sleep. + // + // + If it is already waiting, the following notification will wake + // it up. + // + // Either way, after this call the other thread must make progress. atomic_wait::wake_one(&self.state); } asleep } + // Attempt to send the thread to sleep. This should only be called on a + // single thread, and we say that this controller "belongs" to that thread. + // + // Returns true if this thread makes a syscall to suspend the thread, and + // false if the thread was already woken (letting us skip the syscall). pub fn sleep(&self) -> bool { + // Set the state to ASLEEP and read the current state, which must be + // either LOCKED or SIGNAL. let state = self.state.swap(ASLEEP, Ordering::Relaxed); + // If the state is LOCKED, then we have not yet received a signal, and + // we should try to put the thread to sleep. Otherwise we should return + // early. let sleep = state == LOCKED; if sleep { + // If we have received a signal since entering the sleep state + // (meaning the state is not longer set to ASLEEP) then this will + // return emediately. + // + // If the state is still ASLEEP, then the next call to `wake` will + // register that and call `wake_on`. + // + // Either way, there is no way we can fail to receive a `wake`. atomic_wait::wait(&self.state, ASLEEP); } + // Set the state back to LOCKED so that we are ready to receive new + // signals. self.state.store(LOCKED, Ordering::Relaxed); sleep } diff --git a/src/lib.rs b/src/lib.rs index 16e1f14..89277a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,8 @@ mod util; // Top-level exports pub use scope::Scope; +pub use scope::ScopedSpawn; +pub use thread_pool::Spawn; pub use thread_pool::ThreadPool; pub use thread_pool::Worker; pub use thread_pool::Yield; @@ -53,8 +55,6 @@ pub use thread_pool::block_on; pub use thread_pool::join; pub use thread_pool::scope; pub use thread_pool::spawn; -pub use thread_pool::spawn_async; -pub use thread_pool::spawn_future; // ----------------------------------------------------------------------------- // Platform Support diff --git a/src/scope.rs b/src/scope.rs index eaa6b5d..e64d348 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -11,7 +11,6 @@ use async_task::Runnable; use async_task::Task; use scope_ptr::ScopePtr; -use crate::ThreadPool; use crate::job::HeapJob; use crate::job::JobRef; use crate::latch::Latch; @@ -24,6 +23,9 @@ use crate::unwind::AbortOnDrop; // Scope /// A scope which can spawn a number of non-static jobs and async tasks. +/// +/// For an explanation of the lifetimes 'scope and 'env, refer to the +/// documentation on the `with_scope` function. pub struct Scope<'scope, 'env: 'scope> { /// Number of active references to the scope (including the owning /// allocation). This is incremented each time a new `ScopePtr` is created, @@ -34,14 +36,57 @@ pub struct Scope<'scope, 'env: 'scope> { completed: Latch, /// If any job panics, we store the result here to propagate it. panic: AtomicPtr>, - /// Makes `Scope` invariant over 'scope + /// This adds invariance over 'scope, to make sure 'scope cannot shrink, + /// which is necessary for soundness. + /// + /// Without invariance, this would compile fine but be unsound: + /// + /// ```compile_fail + /// # use forte::ThreadPool; + /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); + /// # THREAD_POOL.populate(); + /// # THREAD_POOL.with_worker(|worker| { + /// worker.scope(|scope| { + /// scope.spawn_on(worker, |worker: &Worker| { + /// let a = String::from("abcd"); + /// scope.spawn_on(worker, |_: &Worker| println!("{a:?}")); // might run after `a` is dropped + /// }); + /// }); + /// # }); + /// ``` _scope: PhantomData<&'scope mut &'scope ()>, - /// Makes `Scope` invantiant over 'env + /// This adds covariance over 'env _env: PhantomData<&'env mut &'env ()>, } -/// Crates a new scope on a worker. [`Worker::scope`] is just an alias for this -/// function. +/// Executes a new scope on a worker. [`Worker::scope`], +/// [`ThreadPool::scope`][crate::ThreadPool::scope] and [`scope`][crate::scope()] are all just +/// an aliases for this function. +/// +/// # Lifetimes +/// +/// This implementation of scopes is heavily based on `std::thread::scope`, and +/// this section is ported from the excellent stdlib docs. +/// +/// A scope has two lifetimes: `'scope` and `'env`. +/// +/// The `'scope` lifetime represents the lifetime of the scope itself. That is: +/// the time during which new scoped jobs may be spawned, and also the time +/// during which they might still be running. This lifetime starts within the +/// `with_scope` function, before the closure `f` (the argument to `with_scope`) +/// is executed. It ends after the closure `f` returns and after all scoped work +/// is complete, but before `with_scope` returns. +/// +/// The `'env` lifetime represents the lifetime of whatever is borrowed by the +/// scoped jobs. This lifetime must outlast the call to `with_scope`, and thus +/// cannot be smaller than `'scope`. It can be as small as the call to +/// `with_scope`, meaning that anything that outlives this call, such as local +/// variables defined right before the scope, can be borrowed by the scoped +/// jobs. +/// +/// The `'env: 'scope` bound is part of the definition of the `Scope` type. The +/// requirement that scoped work outlive `'scope` is part of the definition of +/// the `ScopedSpawn` trait. #[inline] pub fn with_scope<'env, F, T>(worker: &Worker, f: F) -> T where @@ -63,7 +108,7 @@ where None } }; - // Now that the user has (presuamably) spawnd some work onto the scope, we + // Now that the user has (presumably) spawned some work onto the scope, we // must wait for it to complete. // // SAFETY: This is called only once, and we provide the same worker used to @@ -97,128 +142,39 @@ impl<'scope, 'env> Scope<'scope, 'env> { } } - /// Spawns a scoped job onto the local worker. This job will execute - /// sometime before the scope completes. - /// - /// # Returns + /// Spawns a scoped job onto the current worker. Refer to [`spawn_on`] for + /// more extensive documentation. /// - /// Nothing. The spawned closures cannot pass back values to the caller - /// directly, though they can write to local variables on the stack (if - /// those variables outlive the scope) or communicate through shared - /// channels. + /// # Panics /// - /// If you need to return a value, spawn a `Future` instead with - /// [`Scope::spawn_future_on`]. - /// - pub fn spawn_on(&self, worker: &Worker, f: F) - where - F: FnOnce(&Worker) + Send + 'scope, - { - // Create a job to execute the spawned function in the scope. - let scope_ptr = ScopePtr::new(self); - let job = HeapJob::new(move |worker| { - // Catch any panics and store them on the scope. - let result = unwind::halt_unwinding(|| f(worker)); - if let Err(err) = result { - scope_ptr.store_panic(err); - }; - drop(scope_ptr); - }); - - // SAFETY: We must ensure that the heap job does not outlive the data it - // closes over. In effect, this means it must not outlive `'scope`. - // - // This is ensured by the `scope_ptr` and the scope rules, which will - // keep the calling stack frame alive until this job completes, - // effectively extending the lifetime of `'scope` for as long as is - // nessicary. - let job_ref = unsafe { job.into_job_ref() }; - - // Send the job to a queue to be executed. - worker.queue.push_back(job_ref); + /// If not in a worker, this panics. + pub fn spawn>(&'scope self, scoped_work: S) -> T { + Worker::with_current(|worker| scoped_work.spawn_on(worker.unwrap(), self)) } - /// Spawns a future onto the scope. This future will be asynchronously - /// polled to completion some time before the scope completes. - /// - /// # Returns - /// - /// This returns a task, which represents a handle to the async computation - /// and is itself a future that can be awaited to receive the output of the - /// future. There's four ways to interact with a task: + /// Spawns a scoped job onto the provided worker. This job will execute + /// sometime before the scope completes. /// - /// 1. Await the task. This will eventually produce the output of the - /// provided future. The scope will not complete until the output is - /// returned to the awaiting logic. + /// This function can be passed either a closure (for serial work) or a + /// future (for async work): + /// + If passed a closure, this returns nothing. + /// + If passed a future, this returns a `Task` handle that can be used + /// to await the result. /// - /// 2. Drop the task. This will stop execution of the future and potentially - /// allow the scope to complete immediately. + /// # See also /// - /// 3. Cancel the task. This has the same effect as dropping the task, but - /// waits until the futures stops running (which in the worst-case means - /// waiting for the scope to complete). + /// The [`ThreadPool::scope`](crate::ThreadPool::scope) function has more + /// extensive documentation about task spawning. /// - /// 4. Detach the task. This will allow the future to continue executing - /// even after the task itself is dropped. The scope will only complete - /// after the future polls to completion. Detaching a task with an - /// infinite loop will prevent the scope from completing, and is not - /// recommended. + /// # Panics /// - pub fn spawn_future_on(&self, thread_pool: &'static ThreadPool, future: F) -> Task - where - F: Future + Send + 'scope, - T: Send, - { - // Embed the scope pointer into the future. - let scope_ptr = ScopePtr::new(self); - let future = async move { - let result = future.await; - drop(scope_ptr); - result - }; - - // The schedule function will turn the future into a job when woken. - let schedule = move |runnable: Runnable| { - // Turn the runnable into a job-ref that we can send to a worker. - - // SAFETY: We provide a pointer to a non-null runnable, and we turn - // it back into a non-null runnable. The runnable will remain valid - // until the task is run. - let job_ref = unsafe { - JobRef::new_raw(runnable.into_raw(), |this, _| { - let runnable = Runnable::<()>::from_raw(this); - // Poll the task. - runnable.run(); - }) - }; - - // Send this job off to be executed. When this schedule function is - // called on a worker thread this re-schedules it onto the worker's - // local queue, which will generally cause tasks to stick to the - // same thread instead of jumping around randomly. This is also - // faster than injecting into the global queue. - thread_pool.with_worker(|worker| { - worker.queue.push_back(job_ref); - }); - }; - - // SAFETY: We must ensure that the runnable does not outlive the data it - // closes over. In effect, this means it must not outlive `'scope`. - // - // This is ensured by the `scope_ptr` and the scope rules, which will - // keep the calling stack frame alive until the runnable is dropped, - // effectively extending the lifetime of `'scope` for as long as is - // nessicary. - // - // We have to use `spawn_unchecked` here instead of `spawn` because the - // future is non-static. - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; - - // Call the schedule function once to create the initial job. - runnable.schedule(); - - // Return the task handle. - task + /// Panics if not called from within a worker. + pub fn spawn_on>( + &'scope self, + worker: &Worker, + scoped_work: S, + ) -> T { + scoped_work.spawn_on(worker, self) } /// Adds an additional reference to the scope's reference counter. @@ -312,6 +268,109 @@ impl<'scope, 'env> Scope<'scope, 'env> { } } +// ----------------------------------------------------------------------------- +// Generalized scoped spawn trait + +/// Logic for spawning scoped work onto a thread pool. +/// +/// This trait defines the behavior of [`Scope::spawn`] for various types. +pub trait ScopedSpawn<'scope, T>: 'scope { + /// Spawns scoped work onto the thread pool. + fn spawn_on<'env>(self, worker: &Worker, scope: &'scope Scope<'scope, 'env>) -> T; +} + +impl<'scope, F> ScopedSpawn<'scope, ()> for F +where + F: FnOnce(&Worker) + Send + 'scope, +{ + #[inline] + fn spawn_on<'env>(self, worker: &Worker, scope: &'scope Scope<'scope, 'env>) { + // Create a job to execute the spawned function in the scope. + let scope_ptr = ScopePtr::new(scope); + let job = HeapJob::new(move |worker| { + // Catch any panics and store them on the scope. + let result = unwind::halt_unwinding(|| self(worker)); + if let Err(err) = result { + scope_ptr.store_panic(err); + }; + drop(scope_ptr); + }); + + // SAFETY: We must ensure that the heap job does not outlive the data it + // closes over. In effect, this means it must not outlive `'scope`. + // + // This is ensured by the `scope_ptr` and the scope rules, which will + // keep the calling stack frame alive until this job completes, + // effectively extending the lifetime of `'scope` for as long as is + // nessicary. + let job_ref = unsafe { job.into_job_ref() }; + + // Send the job to a queue to be executed. + worker.enqueue(job_ref); + } +} + +impl<'scope, Fut, T> ScopedSpawn<'scope, Task> for Fut +where + Fut: Future + Send + 'scope, + T: Send, +{ + #[inline] + fn spawn_on<'env>(self, worker: &Worker, scope: &'scope Scope<'scope, 'env>) -> Task { + // Embed the scope pointer into the future. + let scope_ptr = ScopePtr::new(scope); + let future = async move { + let result = self.await; + drop(scope_ptr); + result + }; + + // The schedule function will turn the future into a job when woken. + let thread_pool = worker.thread_pool(); + let schedule = move |runnable: Runnable| { + // Turn the runnable into a job-ref that we can send to a worker. + + // SAFETY: We provide a pointer to a non-null runnable, and we turn + // it back into a non-null runnable. The runnable will remain valid + // until the task is run. + let job_ref = unsafe { + JobRef::new_raw(runnable.into_raw(), |this, _| { + let runnable = Runnable::<()>::from_raw(this); + // Poll the task. + runnable.run(); + }) + }; + + // Send this job off to be executed. When this schedule function is + // called on a worker thread this re-schedules it onto the worker's + // local queue, which will generally cause tasks to stick to the + // same thread instead of jumping around randomly. This is also + // faster than injecting into the global queue. + thread_pool.with_worker(|worker| { + worker.enqueue(job_ref); + }); + }; + + // SAFETY: We must ensure that the runnable does not outlive the data it + // closes over. In effect, this means it must not outlive `'scope`. + // + // This is ensured by the `scope_ptr` and the scope rules, which will + // keep the calling stack frame alive until the runnable is dropped, + // effectively extending the lifetime of `'scope` for as long as is + // nessicary. + // + // We have to use `spawn_unchecked` here instead of `spawn` because the + // future is non-static. + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; + + // Call the schedule function once to create the initial job. + runnable.schedule(); + + // Return the task handle. + task + } +} + // ----------------------------------------------------------------------------- // Scope pointer @@ -377,12 +436,19 @@ mod scope_ptr { #[cfg(all(test, not(feature = "shuttle")))] mod tests { + use core::pin::Pin; use core::sync::atomic::AtomicU8; use core::sync::atomic::Ordering; + use core::task::Context; + use core::task::Poll; use crate::ThreadPool; + use crate::Worker; use crate::scope; + /// Test that it is possible to borrow local data within a scope, modify it, + /// and then read it later. This is mostly here to ensure stuff like this + /// compiles. #[test] fn scoped_borrow() { static THREAD_POOL: ThreadPool = ThreadPool::new(); @@ -391,39 +457,96 @@ mod tests { let mut string = "a"; THREAD_POOL.with_worker(|worker| { scope(|scope| { - scope.spawn_on(worker, |_| { + scope.spawn_on(worker, |_: &Worker| { string = "b"; - }) + }); }); }); assert_eq!(string, "b"); + + THREAD_POOL.depopulate(); } + /// Test that it is possible to borrow local data immutably within deeply + /// nested scopes. This is also mostly here to ensure stuff like this + /// compiles. #[test] fn scoped_borrow_twice() { static THREAD_POOL: ThreadPool = ThreadPool::new(); THREAD_POOL.populate(); - let mut string = "a"; + let counter = AtomicU8::new(0); THREAD_POOL.with_worker(|worker| { scope(|scope| { - scope.spawn_on(worker, |worker| { - string = "b"; - scope.spawn_on(worker, |_| { - string = "c"; + scope.spawn_on(worker, |_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + scope.spawn(|_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + }); + }); + scope.spawn_on(worker, |worker: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + scope.spawn_on(worker, |_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); }) }) }); }); - assert_eq!(string, "c"); + assert_eq!(counter.load(Ordering::Relaxed), 4); + + THREAD_POOL.depopulate(); + } + + /// This is a handy future that needs to be polled repeatedly before + /// resolving. + /// + /// Each time it is polled, it wakes itself (so it will be polled again) and + /// yields. It does this until it has been polled 128 times. + /// + /// This lets us test the behavior of scopes for sleeping tasks, to ensure + /// we do not return from the scope while tasks are still pending. + #[derive(Default)] + struct CountFuture { + /// The number of times the future has been polled. + count: usize, + } + + impl Future for CountFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count == 128 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + /// Tests that we can spawn futures onto a scope, and that the scope really + /// does poll wait for the future to complete before returning. + #[test] + fn scoped_future() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.with_worker(|worker| { + let task = worker.scope(|scope| scope.spawn_on(worker, CountFuture::default())); + assert!(task.is_finished()); + }); + + THREAD_POOL.depopulate(); } + /// Tests that blocking functions like `join` can be nested within scopes. #[test] fn scoped_concurrency() { const NUM_JOBS: u8 = 128; static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.resize_to(4); + THREAD_POOL.resize_to_available(); let a = AtomicU8::new(0); let b = AtomicU8::new(0); @@ -431,7 +554,7 @@ mod tests { THREAD_POOL.with_worker(|worker| { scope(|scope| { for _ in 0..NUM_JOBS { - scope.spawn_on(worker, |_| { + scope.spawn_on(worker, |_: &Worker| { THREAD_POOL.join( |_| a.fetch_add(1, Ordering::Relaxed), |_| b.fetch_add(1, Ordering::Relaxed), @@ -446,4 +569,33 @@ mod tests { THREAD_POOL.depopulate(); } + + /// Tests that nesting two scopes on different workers will not deadlock. + #[test] + fn scoped_nesting() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut string = "a"; + + THREAD_POOL.with_worker(|worker| { + worker.scope(|scope| { + scope.spawn_on(worker, |_: &Worker| { + // Creating a new worker instead of reusing the old one is + // bad form, but we may as well test it. + THREAD_POOL.with_worker(|worker| { + worker.scope(|scope| { + scope.spawn_on(worker, |_: &Worker| { + string = "b"; + }); + }); + }); + }); + }); + }); + + assert_eq!(string, "b"); + + THREAD_POOL.depopulate(); + } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index dc7304e..6a789e2 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -54,6 +54,7 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(5); /// /// ```rust,no_run /// # use forte::ThreadPool; +/// # use forte::Worker; /// // Allocate a new thread pool. /// static THREAD_POOL: ThreadPool = ThreadPool::new(); /// @@ -67,20 +68,20 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(5); /// // Spawn a job onto the pool. The closure also accepts a worker, because the /// // job may be executed on a different thread. This will be the worker for whatever /// // thread it executes on. -/// worker.spawn(|worker| { +/// worker.spawn(|worker: &Worker| { /// // Spawn another job after this one runs, using the provided local worker. -/// worker.spawn(|_| { }); +/// worker.spawn(|_: &Worker| { }); /// // Spawn another job using the thread pool directly (this will be slower). -/// THREAD_POOL.spawn(|_| { }); +/// THREAD_POOL.spawn(|_: &Worker| { }); /// // Spawn a third job, which will automatically use the parent thread pool. /// // This will also be slower than using the worker. -/// forte::spawn(|_| { }); +/// forte::spawn(|_: &Worker| { }); /// }); /// /// // Spawn an async job, which can return a value through a `Future`. This does not /// // provide access to a worker, because futures may move between threads while they /// // are suspended. -/// let task = THREAD_POOL.spawn_async(async || { "Hello World" }); +/// let task = THREAD_POOL.spawn(async { "Hello World" }); /// /// // Do two operations in parallel, and await the result of each. This is the most /// // efficient and hyper-optimized thread pool operation. @@ -112,13 +113,15 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(5); /// space. More granular control is possible through other methods such as /// [`ThreadPool::grow`], [`ThreadPool::shrink`], or [`ThreadPool::resize_to`]. pub struct ThreadPool { - /// The internal state of the thread pool. This mutex should only be - /// accessed infrequently. + /// The internal state of the thread pool + /// + /// This should only be locked infrequently for short periods of time in + /// cold functions. state: Mutex, /// A queue used for cooperatively sharing jobs between workers. shared_jobs: SegQueue, /// A condvar that is used to signal a new worker taking a lease on a seat. - new_participant: Condvar, + start_heartbeat: Condvar, } /// The internal state of a thread pool. @@ -285,14 +288,15 @@ impl ThreadPool { }, }), shared_jobs: SegQueue::new(), - new_participant: Condvar::new(), + start_heartbeat: Condvar::new(), } } /// Claims a lease on the thread pool which can be occupied by a worker /// (using [`Worker::occupy`]), allowing a thread to participate in the pool. + #[cold] pub fn claim_lease(&'static self) -> Lease { - self.new_participant.notify_one(); + self.start_heartbeat.notify_one(); let mut state = self.state.lock().unwrap(); state.claim_lease(self) } @@ -440,36 +444,45 @@ impl ThreadPool { } // The size decreased cmp::Ordering::Less => { - // Halt the heartbeat thread when scaling to zero. - if let Some(control) = state.managed_threads.heartbeat.take() { - control.halt.store(true, Ordering::Relaxed); - let _ = control.handle.join(); - } - // Pull the workers we intend to halt out of the thread manager. let terminating_workers = state.managed_threads.workers.split_off(new_size); - drop(state); + // Halt the heartbeat thread when scaling to zero. + let heartbeat_control = if new_size == 0 { + state.managed_threads.heartbeat.take() + } else { + None + }; - // Terminate the workers. + // Terminate and wake the workers. for worker in &terminating_workers { // Tell the worker to halt. worker.control.halt.store(true, Ordering::Relaxed); + // Wake the worker up. + state.seats[worker.index].data.sleep_controller.wake(); } - // Wake any sleeping workers to ensure they will eventually see the termination notice. - // self.job_is_ready.notify_all(); + // Drop the lock on the state so as not to block the workers or heartbeat. + drop(state); - let own_lease = Worker::map_current(|worker| worker.lease.index); + // Determine our seat index. + let own_seat = Worker::map_current(|worker| worker.lease.index); - // Wait for the workers to fully halt. + // Wait for the other workers to fully halt. for worker in terminating_workers { // It's possible we may be trying to terminate ourselves, in // which case we can skip the thread-join. - if Some(worker.index) != own_lease { + if Some(worker.index) != own_seat { let _ = worker.control.handle.join(); } } + + // If we took control of the heartbeat, halt it after the workers. + if let Some(control) = heartbeat_control { + control.halt.store(true, Ordering::Relaxed); + self.start_heartbeat.notify_one(); + let _ = control.handle.join(); + } } } @@ -521,29 +534,60 @@ impl ThreadPool { } // ----------------------------------------------------------------------------- -// Thread pool scheduling api +// Generalized spawn trait -impl ThreadPool { - /// Spawns a job into the thread pool. - /// - /// See also: [`Worker::spawn`] and [`spawn`]. - #[inline(always)] - pub fn spawn(&'static self, f: F) - where - F: FnOnce(&Worker) + Send + 'static, - { - self.with_worker(|worker| worker.spawn(f)); +/// Logic for spawning work onto a thread pool. +/// +/// This trait defines the behavior of [`ThreadPool::spawn`] for various types. +pub trait Spawn { + /// Spawns work onto the thread pool. + fn spawn(self, thread_pool: &'static ThreadPool, worker: Option<&Worker>) -> T; +} + +impl Spawn<()> for F +where + F: for<'worker> FnOnce(&'worker Worker) + Send + 'static, +{ + #[inline] + fn spawn(self, thread_pool: &'static ThreadPool, worker: Option<&Worker>) { + // Allocate a new job on the heap to store the closure. + let job = HeapJob::new(self); + + // Turn the job into an "owning" `JobRef` so it can be queued. + // + // SAFETY: All jobs added to the queue are guaranteed to be executed + // eventually, this is one of the core invariants of the thread pool. + // The closure `f` has a static lifetime, meaning it only closes over + // data that lasts for the duration of the program, so it's not possible + // for this job to outlive the data `f` closes over. + let job_ref = unsafe { job.into_job_ref() }; + + // Queue the job for evaluation + if let Some(worker) = worker { + worker.enqueue(job_ref); + } else { + thread_pool.shared_jobs.push(job_ref); + } } +} - /// Spawns a future onto the thread pool. - /// - /// See also: [`Worker::spawn_future`] and [`spawn_future`]. - #[inline(always)] - pub fn spawn_future(&'static self, future: F) -> Task - where - F: Future + Send + 'static, - T: Send + 'static, - { +// Executes a raw pointer to a runnable as a job. +#[inline(always)] +fn execute_runnable(this: NonNull<()>, _worker: &Worker) { + // SAFETY: This pointer was created by the call to `Runnable::into_raw` just above. + let runnable = unsafe { Runnable::<()>::from_raw(this) }; + // Poll the task. This will drop the future if the task is + // canceled or the future completes. + runnable.run(); +} + +impl Spawn> for Fut +where + Fut: Future + Send + 'static, + T: Send + 'static, +{ + #[inline] + fn spawn(self, thread_pool: &'static ThreadPool, _worker: Option<&Worker>) -> Task { // This function "schedules" work on the future, which in this case // pushing a `JobRef` that knows how to run it onto the local work queue. let schedule = move |runnable: Runnable| { @@ -553,28 +597,18 @@ impl ThreadPool { // second allocation. let job_pointer = runnable.into_raw(); - // Define a function to run the runnable that will be compatible with `JobRef`. - #[inline(always)] - fn execute_runnable(this: NonNull<()>, _worker: &Worker) { - // SAFETY: This pointer was created by the call to `Runnable::into_raw` just above. - let runnable = unsafe { Runnable::<()>::from_raw(this) }; - // Poll the task. This will drop the future if the task is - // canceled or the future completes. - runnable.run(); - } - // SAFETY: The raw runnable pointer will remain valid until it is // used by `execute_runnable`, after which it will be dropped. let job_ref = unsafe { JobRef::new_raw(job_pointer, execute_runnable) }; // Send this job off to be executed. - self.with_worker(|worker| { + thread_pool.with_worker(|worker| { worker.enqueue(job_ref); }); }; // Creates a task from the future and schedule. - let (runnable, task) = async_task::spawn(future, schedule); + let (runnable, task) = async_task::spawn(self, schedule); // This calls the schedule function, pushing a `JobRef` for the future // onto the local work queue. If the future dosn't complete, it will @@ -591,21 +625,18 @@ impl ThreadPool { // Return the task, which acts as a handle for this series of jobs. task } +} + +// ----------------------------------------------------------------------------- +// Thread pool scheduling api - /// Spawns an async closure onto the thread pool. +impl ThreadPool { + /// Spawns a job into the thread pool. /// - /// See also: [`Worker::spawn_async`] and [`spawn_async`]. - #[inline(always)] - pub fn spawn_async(&'static self, f: Fn) -> Task - where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, - { - // Wrap the function into a future using an async block. - let future = async move { f().await }; - // We just pass this future to `spawn_future`. - self.spawn_future(future) + /// See also: [`Worker::spawn`] and [`spawn`]. + #[inline] + pub fn spawn>(&'static self, work: S) -> T { + work.spawn(self, None) } /// Blocks the thread waiting for a future to complete. @@ -677,7 +708,7 @@ thread_local! { pub struct Worker { migrated: Cell, lease: Lease, - pub(crate) queue: JobQueue, + queue: JobQueue, rng: XorShift64Star, // Make non-send _phantom: PhantomData<*const ()>, @@ -803,6 +834,12 @@ impl Worker { self.lease.index } + /// Returns the index of the threadpool of the worker. + #[inline(always)] + pub fn thread_pool(&self) -> &'static ThreadPool { + self.lease.thread_pool + } + /// Pushes a job onto the local queue, overflowing to the shared queue when /// full. #[inline(always)] @@ -876,7 +913,7 @@ impl Worker { // workers deques, and finally to injected jobs from the outside. The // idea is to finish what we started before we take on something new. self.queue - .pop_back() + .pop_front() .map(|job| (job, false)) .or_else(|| self.claim_shared_job().map(|job| (job, true))) } @@ -894,7 +931,7 @@ impl Worker { /// worker. It will never claim shared work. #[inline(always)] pub fn yield_local(&self) -> Yield { - match self.queue.pop_back() { + match self.queue.pop_front() { Some(job_ref) => { self.execute(job_ref, false); Yield::Executed @@ -906,7 +943,7 @@ impl Worker { /// Cooperatively yields execution to the threadpool, allowing it to execute /// some work. /// - /// Tis function may execute either local or shared work: work already + /// This function may execute either local or shared work: work already /// queued on the worker, or work off-loaded by a different worker. If there /// is no work on the pool, this will lock the thread-pool mutex, so it /// should not be called within a hot loop. Consider using @@ -943,91 +980,26 @@ impl Worker { // ----------------------------------------------------------------------------- // Worker scheduling api +/// # Scheduling API impl Worker { - /// Spawns a new closure onto the thread pool. Just like a standard thread, - /// this task is not tied to the current stack frame, and hence it cannot - /// hold any references other than those with 'static lifetime. If you want - /// to spawn a task that references stack data, use the - /// [`Worker::scope()`] function to create a scope. + /// Spawns work (a closure or future) onto the thread pool. Just like a + /// standard thread, this work executes concurrently (and potentially in + /// parallel) to the place where it is spawned. It is not tied to the + /// current stack frame, and hence it cannot hold any references other than + /// those with `'static` lifetime. If you want to spawn a task that + /// references stack data, use the [`scope`], [`ThreadPool::scope`] or + /// [`Worker::scope`] functions. /// /// Since tasks spawned with this function cannot hold references into the /// enclosing stack frame, you almost certainly want to use a move closure /// as their argument (otherwise, the closure will typically hold references /// to any variables from the enclosing function that you happen to use). /// - /// To spawn an async closure or future, use [`Worker::spawn_async`] or - /// [`Worker::spawn_future`]. To spawn a non-static closure, use - /// [`ThreadPool::scope`]. - /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::spawn`] or simply [`spawn`]. - #[inline(always)] - pub fn spawn(&self, f: F) - where - F: FnOnce(&Worker) + Send + 'static, - { - // Allocate a new job on the heap to store the closure. - let job = HeapJob::new(f); - - // Turn the job into an "owning" `JobRef` so it can be queued. - // - // SAFETY: All jobs added to the queue are guaranteed to be executed - // eventually, this is one of the core invariants of the thread pool. - // The closure `f` has a static lifetime, meaning it only closes over - // data that lasts for the duration of the program, so it's not possible - // for this job to outlive the data `f` closes over. - let job_ref = unsafe { job.into_job_ref() }; - - // Queue the `JobRef` on the worker so that it will be evaluated. - self.enqueue(job_ref); - } - - /// Spawns a future onto the thread pool. See [`Worker::spawn`] for more - /// information about spawning jobs. Only static futures are supported - /// through this function, but you can use [`Worker::scope`] to get a scope - /// on which non-static futures and async tasks can be spawned. - /// - /// # Returns - /// - /// Spawning a future returns a [`Task`], which represents a handle to the async - /// computation and is itself a future that can be awaited to receive the - /// return value. There's four ways to interact with a task: - /// - /// 1. Await the task. This will eventually produce the output of the - /// provided future. - /// - /// 2. Drop the task. This will stop execution of the future. - /// - /// 3. Cancel the task. This has the same effect as dropping the task, but - /// waits until the future stops running (which can take a while). - /// - /// 4. Detach the task. This will allow the future to continue executing - /// even after the task itself is dropped. - /// - /// If you do not have access to a [`Worker`], you may call - /// [`ThreadPool::spawn_future`] or simply [`spawn_future`]. - #[inline(always)] - pub fn spawn_future(&self, future: F) -> Task - where - F: Future + Send + 'static, - T: Send + 'static, - { - self.lease.thread_pool.spawn_future(future) - } - - /// Spawns an async closure onto the task pool. This is a simple wrapper - /// around [`Worker::spawn_future`]. - /// - /// If you do not have access to a [`Worker`], you may call - /// [`ThreadPool::spawn_async`] or simply [`spawn_async`]. - #[inline(always)] - pub fn spawn_async(&self, f: Fn) -> Task - where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, - { - self.lease.thread_pool.spawn_async(f) + #[inline] + pub fn spawn>(&self, work: S) -> T { + work.spawn(self.lease.thread_pool, Some(self)) } /// Polls a future to completion, then returns the outcome. This function @@ -1178,49 +1150,11 @@ impl Worker { /// If there is no current thread pool, this panics. /// /// See also: [`Worker::spawn`] and [`ThreadPool::spawn`]. -pub fn spawn(f: F) -where - F: FnOnce(&Worker) + Send + 'static, -{ +pub fn spawn>(work: S) -> T { Worker::with_current(|worker| { worker .expect("attempt to call `forte::spawn` from outside a thread pool") - .spawn(f); - }); -} - -/// Spawns a future onto the current thread pool. -/// -/// If there is no current thread pool, this panics. -/// -/// See also: [`Worker::spawn_future`] and [`ThreadPool::spawn_future`]. -pub fn spawn_future(future: F) -> Task -where - F: Future + Send + 'static, - T: Send + 'static, -{ - Worker::with_current(|worker| { - worker - .expect("attempt to call `forte::spawn_future` from outside a thread pool") - .spawn_future(future) - }) -} - -/// Spawns an async closure onto the current thread pool. -/// -/// If there is no current thread pool, this panics. -/// -/// See also: [`Worker::spawn_async`] and [`ThreadPool::spawn_async`]. -pub fn spawn_async(f: Fn) -> Task -where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, -{ - Worker::with_current(|worker| { - worker - .expect("attempt to call `forte::spawn_async` from outside a thread pool") - .spawn_async(f) + .spawn(work) }) } @@ -1228,7 +1162,7 @@ where /// /// If there is no current thread pool, this panics. /// -/// See also: [`Worker::spawn_future`] and [`ThreadPool::spawn_future`]. +/// See also: [`Worker::block_on`] and [`ThreadPool::block_on`]. pub fn block_on(future: F) -> T where F: Future + Send, @@ -1292,13 +1226,14 @@ where /// /// ``` /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, |_| { +/// scope.spawn_on(worker, |_: &Worker| { /// // Transfer ownership of `bad` into a local variable (also named `bad`). /// // This will force the closure to take ownership of `bad` from the environment. /// let bad = bad; @@ -1306,7 +1241,7 @@ where /// println!("bad: {:?}", bad); // refers to our local variable, above. /// }); /// -/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); // we too can borrow `ok` /// }); /// # }); /// ``` @@ -1318,20 +1253,21 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, move |_| { +/// scope.spawn_on(worker, move |_: &Worker| { /// println!("ok: {:?}", ok); /// println!("bad: {:?}", bad); /// }); /// /// // That closure is fine, but now we can't use `ok` anywhere else, /// // since it is owned by the previous task: -/// // scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); +/// // scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); /// }); /// # }); /// ``` @@ -1343,6 +1279,7 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { @@ -1350,7 +1287,7 @@ where /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; /// let ok: &Vec = &ok; // shadow the original `ok` -/// scope.spawn_on(worker, move |_| { +/// scope.spawn_on(worker, move |_: &Worker| { /// println!("ok: {:?}", ok); // captures the shadowed version /// println!("bad: {:?}", bad); /// }); @@ -1359,7 +1296,7 @@ where /// // can be shared freely. Note that we need a `move` closure here though, /// // because otherwise we'd be trying to borrow the shadowed `ok`, /// // and that doesn't outlive `scope`. -/// scope.spawn_on(worker, move |_| println!("ok: {:?}", ok)); +/// scope.spawn_on(worker, move |_: &Worker| println!("ok: {:?}", ok)); /// }); /// # }); /// ``` @@ -1369,13 +1306,14 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, |_| { +/// scope.spawn_on(worker, |_: &Worker| { /// // Transfer ownership of `bad` into a local variable (also named `bad`). /// // This will force the closure to take ownership of `bad` from the environment. /// let bad = bad; @@ -1383,7 +1321,7 @@ where /// println!("bad: {:?}", bad); // refers to our local variable, above. /// }); /// -/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); // we too can borrow `ok` /// }); /// # }); /// ``` @@ -1395,6 +1333,7 @@ where /// /// ```compile_fail /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { @@ -1411,16 +1350,18 @@ where /// /// ``` /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let mut counter = 0; +/// let counter_ref = &mut counter; /// forte::scope(|scope| { -/// scope.spawn_on(worker, |worker| { -/// counter += 1; +/// scope.spawn_on(worker, |worker: &Worker| { +/// *counter_ref += 1; /// // Note: we borrow the scope again here. -/// scope.spawn_on(worker, |_| { -/// counter += 1; +/// scope.spawn_on(worker, move |_: &Worker| { +/// *counter_ref += 1; /// }); /// }); /// }); @@ -1434,6 +1375,7 @@ where /// /// ```compile_fail /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// THREAD_POOL.with_worker(|worker| { @@ -1490,7 +1432,7 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { // Register as the indicated worker, and work until we are told to halt. Worker::occupy(lease, |worker| { while !halt.load(Ordering::Relaxed) { - if let Some(job) = worker.queue.pop_back() { + if let Some(job) = worker.queue.pop_front() { worker.execute(job, false); continue; } @@ -1564,7 +1506,7 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { thread::sleep(sleep_interval); state = thread_pool.state.lock().unwrap(); } else { - state = thread_pool.new_participant.wait(state).unwrap(); + state = thread_pool.start_heartbeat.wait(state).unwrap(); } } } @@ -1581,7 +1523,7 @@ mod tests { #[test] fn join_basic() { static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); let mut a = 0; let mut b = 0; @@ -1608,7 +1550,7 @@ mod tests { } static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); let mut vals = [0; 1_024]; THREAD_POOL.with_worker(|worker| increment(worker, &mut vals)); @@ -1636,11 +1578,11 @@ mod tests { } static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); - let mut vals = vec![0; 1_024 * 1_024]; + let mut vals = vec![0; 512 * 64]; THREAD_POOL.with_worker(|worker| increment(worker, &mut vals)); - assert_eq!(vals, vec![1; 1_024 * 1_024]); + assert_eq!(vals, vec![1; 512 * 64]); THREAD_POOL.depopulate(); }