diff --git a/Cargo.lock b/Cargo.lock index aeab295f..a08cb1a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1279,6 +1279,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jod-thread" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b23360e99b8717f20aaa4598f5a6541efbe30630039fbc7706cf954a87947ae" + [[package]] name = "js-sys" version = "0.3.91" @@ -2724,6 +2730,18 @@ dependencies = [ "squawk-parser", ] +[[package]] +name = "squawk-thread" +version = "2.43.0" +dependencies = [ + "crossbeam-channel", + "crossbeam-utils", + "jod-thread", + "libc", + "salsa", + "tracing", +] + [[package]] name = "squawk-wasm" version = "2.43.0" @@ -3050,9 +3068,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "tracing-core" version = "0.1.34" diff --git a/Cargo.toml b/Cargo.toml index 585d3592..3386dce9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,11 @@ smallvec = "1.13.2" tabled = "0.17.0" etcetera = "0.11.0" url = "2.5.4" +crossbeam-channel = "0.5.15" +crossbeam-utils = "0.8.21" +jod-thread = "0.1.2" +libc = "0.2.171" +tracing = "0.1.40" # local # we have to make the versions explicit otherwise `cargo publish` won't work @@ -74,6 +79,7 @@ squawk-parser = { path = "./crates/squawk_parser", version = "2.43.0" } squawk-syntax = { path = "./crates/squawk_syntax", version = "2.43.0" } squawk-linter = { path = "./crates/squawk_linter", version = "2.43.0" } squawk-server = { path = "./crates/squawk_server", version = "2.43.0" } +squawk-thread = { path = "./crates/squawk_thread", version = "2.43.0" } [workspace.lints.clippy] collapsible_else_if = "allow" diff --git a/crates/squawk_thread/Cargo.toml b/crates/squawk_thread/Cargo.toml new file mode 100644 index 00000000..b1e17846 --- /dev/null +++ b/crates/squawk_thread/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "squawk-thread" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +description = "Thread pool and related utilities for Squawk" +documentation.workspace = true +homepage.workspace = true +repository.workspace = true + +[lib] +doctest = false + +[dependencies] +crossbeam-channel.workspace = true +crossbeam-utils.workspace = true +jod-thread.workspace = true +libc.workspace = true +salsa.workspace = true +tracing.workspace = true + +[lints] +workspace = true diff --git a/crates/squawk_thread/LICENSE-APACHE b/crates/squawk_thread/LICENSE-APACHE new file mode 100644 index 00000000..c15f28bc --- /dev/null +++ b/crates/squawk_thread/LICENSE-APACHE @@ -0,0 +1,178 @@ +from: https://github.com/rust-lang/rust-analyzer/blob/2efc80078029894eec0699f62ec8d5c1a56af763/LICENSE-APACHE#L1C31-L1C31 + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS diff --git a/crates/squawk_thread/LICENSE-MIT b/crates/squawk_thread/LICENSE-MIT new file mode 100644 index 00000000..c09a39ba --- /dev/null +++ b/crates/squawk_thread/LICENSE-MIT @@ -0,0 +1,25 @@ +from: https://github.com/rust-lang/rust-analyzer/blob/2efc80078029894eec0699f62ec8d5c1a56af763/LICENSE-MIT#L1-L1 + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/crates/squawk_thread/README.md b/crates/squawk_thread/README.md new file mode 100644 index 00000000..4ba94127 --- /dev/null +++ b/crates/squawk_thread/README.md @@ -0,0 +1,5 @@ +# squawk_thread + +> From Rust-Analyzer's [crates/stdx/src/thread.rs](https://github.com/rust-lang/rust-analyzer/blob/2efc80078029894eec0699f62ec8d5c1a56af763/crates/stdx/src/thread.rs#L12C3-L12C3) & related modules. + +Ty also has a version of Rust-Analyzer's [thread utils that's worth checking out](https://github.com/astral-sh/ruff/blob/5011b253c1aca2a1906762cf45414d0fda1a088a/crates/ty_server/src/server/schedule/thread.rs#L14C50-L14C50). diff --git a/crates/squawk_thread/src/intent.rs b/crates/squawk_thread/src/intent.rs new file mode 100644 index 00000000..1203bfc3 --- /dev/null +++ b/crates/squawk_thread/src/intent.rs @@ -0,0 +1,288 @@ +//! An opaque façade around platform-specific `QoS` APIs. + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +// Please maintain order from least to most priority for the derived `Ord` impl. +pub enum ThreadIntent { + /// Any thread which does work that isn't in the critical path of the user typing + /// (e.g. processing Go To Definition). + Worker, + + /// Any thread which does work caused by the user typing + /// (e.g. processing syntax highlighting). + LatencySensitive, +} + +impl ThreadIntent { + // These APIs must remain private; + // we only want consumers to set thread intent + // either during thread creation or using our pool impl. + + pub(super) fn apply_to_current_thread(self) { + let class = thread_intent_to_qos_class(self); + set_current_thread_qos_class(class); + } + + pub(super) fn assert_is_used_on_current_thread(self) { + if IS_QOS_AVAILABLE { + let class = thread_intent_to_qos_class(self); + assert_eq!(get_current_thread_qos_class(), Some(class)); + } + } +} + +use imp::QoSClass; + +const IS_QOS_AVAILABLE: bool = imp::IS_QOS_AVAILABLE; + +#[expect(clippy::semicolon_if_nothing_returned, reason = "thin wrapper")] +fn set_current_thread_qos_class(class: QoSClass) { + imp::set_current_thread_qos_class(class) +} + +fn get_current_thread_qos_class() -> Option { + imp::get_current_thread_qos_class() +} + +fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass { + imp::thread_intent_to_qos_class(intent) +} + +// All Apple platforms use XNU as their kernel +// and thus have the concept of QoS. +#[cfg(target_vendor = "apple")] +mod imp { + use super::ThreadIntent; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + // Please maintain order from least to most priority for the derived `Ord` impl. + pub(super) enum QoSClass { + // Documentation adapted from https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/include/sys/qos.h#L55 + // + /// TLDR: invisible maintenance tasks + /// + /// Contract: + /// + /// * **You do not care about how long it takes for work to finish.** + /// * **You do not care about work being deferred temporarily.** + /// (e.g. if the device's battery is in a critical state) + /// + /// Examples: + /// + /// * in a video editor: + /// creating periodic backups of project files + /// * in a browser: + /// cleaning up cached sites which have not been accessed in a long time + /// * in a collaborative word processor: + /// creating a searchable index of all documents + /// + /// Use this QoS class for background tasks + /// which the user did not initiate themselves + /// and which are invisible to the user. + /// It is expected that this work will take significant time to complete: + /// minutes or even hours. + /// + /// This QoS class provides the most energy and thermally-efficient execution possible. + /// All other work is prioritized over background tasks. + Background, + + /// TLDR: tasks that don't block using your app + /// + /// Contract: + /// + /// * **Your app remains useful even as the task is executing.** + /// + /// Examples: + /// + /// * in a video editor: + /// exporting a video to disk – + /// the user can still work on the timeline + /// * in a browser: + /// automatically extracting a downloaded zip file – + /// the user can still switch tabs + /// * in a collaborative word processor: + /// downloading images embedded in a document – + /// the user can still make edits + /// + /// Use this QoS class for tasks which + /// may or may not be initiated by the user, + /// but whose result is visible. + /// It is expected that this work will take a few seconds to a few minutes. + /// Typically your app will include a progress bar + /// for tasks using this class. + /// + /// This QoS class provides a balance between + /// performance, responsiveness, and efficiency. + Utility, + + /// TLDR: tasks that block using your app + /// + /// Contract: + /// + /// * **You need this work to complete + /// before the user can keep interacting with your app.** + /// * **Your work will not take more than a few seconds to complete.** + /// + /// Examples: + /// + /// * in a video editor: + /// opening a saved project + /// * in a browser: + /// loading a list of the user's bookmarks and top sites + /// when a new tab is created + /// * in a collaborative word processor: + /// running a search on the document's content + /// + /// Use this QoS class for tasks which were initiated by the user + /// and block the usage of your app while they are in progress. + /// It is expected that this work will take a few seconds or less to complete; + /// not long enough to cause the user to switch to something else. + /// Your app will likely indicate progress on these tasks + /// through the display of placeholder content or modals. + /// + /// This QoS class is not energy-efficient. + /// Rather, it provides responsiveness + /// by prioritizing work above other tasks on the system + /// except for critical user-interactive work. + UserInitiated, + + /// TLDR: render loops and nothing else + /// + /// Contract: + /// + /// * **You absolutely need this work to complete immediately + /// or your app will appear to freeze.** + /// * **Your work will always complete virtually instantaneously.** + /// + /// Examples: + /// + /// * the main thread in a GUI application + /// * the update & render loop in a game + /// * a secondary thread which progresses an animation + /// + /// Use this QoS class for any work which, if delayed, + /// will make your user interface unresponsive. + /// It is expected that this work will be virtually instantaneous. + /// + /// This QoS class is not energy-efficient. + /// Specifying this class is a request to run with + /// nearly all available system CPU and I/O bandwidth even under contention. + UserInteractive, + } + + pub(super) const IS_QOS_AVAILABLE: bool = true; + + pub(super) fn set_current_thread_qos_class(class: QoSClass) { + let c = match class { + QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, + QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED, + QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY, + QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND, + }; + + let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) }; + + if code == 0 { + return; + } + + let errno = unsafe { *libc::__error() }; + + match errno { + libc::EPERM => { + // This thread has been excluded from the QoS system + // due to a previous call to a function such as `pthread_setschedparam` + // which is incompatible with QoS. + // + // Panic instead of returning an error + // to maintain the invariant that we only use QoS APIs. + panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})") + } + + libc::EINVAL => { + // This is returned if we pass something other than a qos_class_t + // to `pthread_set_qos_class_self_np`. + // + // This is impossible, so again panic. + unreachable!( + "invalid qos_class_t value was passed to pthread_set_qos_class_self_np" + ) + } + + _ => { + // `pthread_set_qos_class_self_np`'s documentation + // does not mention any other errors. + unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}") + } + } + } + + pub(super) fn get_current_thread_qos_class() -> Option { + let current_thread = unsafe { libc::pthread_self() }; + let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED; + let code = unsafe { + libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut()) + }; + + if code != 0 { + // `pthread_get_qos_class_np`'s documentation states that + // an error value is placed into errno if the return code is not zero. + // However, it never states what errors are possible. + // Inspecting the source[0] shows that, as of this writing, it always returns zero. + // + // Whatever errors the function could report in future are likely to be + // ones which we cannot handle anyway + // + // 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177 + let errno = unsafe { *libc::__error() }; + unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})"); + } + + match qos_class_raw { + libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive), + libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated), + libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set + libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility), + libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background), + + libc::qos_class_t::QOS_CLASS_UNSPECIFIED => { + // Using manual scheduling APIs causes threads to “opt out” of QoS. + // At this point they become incompatible with QoS, + // and as such have the “unspecified” QoS class. + // + // Panic instead of returning an error + // to maintain the invariant that we only use QoS APIs. + panic!("tried to get QoS of thread which has opted out of QoS") + } + } + } + + pub(super) fn thread_intent_to_qos_class(intent: ThreadIntent) -> QoSClass { + match intent { + ThreadIntent::Worker => QoSClass::Utility, + ThreadIntent::LatencySensitive => QoSClass::UserInitiated, + } + } +} + +// FIXME: Windows has QoS APIs, we should use them! +#[cfg(not(target_vendor = "apple"))] +mod imp { + use super::ThreadIntent; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub(super) enum QoSClass { + Default, + } + + pub(super) const IS_QOS_AVAILABLE: bool = false; + + pub(super) fn set_current_thread_qos_class(_: QoSClass) {} + + pub(super) fn get_current_thread_qos_class() -> Option { + None + } + + pub(super) fn thread_intent_to_qos_class(_: ThreadIntent) -> QoSClass { + QoSClass::Default + } +} diff --git a/crates/squawk_thread/src/lib.rs b/crates/squawk_thread/src/lib.rs new file mode 100644 index 00000000..3ea73143 --- /dev/null +++ b/crates/squawk_thread/src/lib.rs @@ -0,0 +1,120 @@ +//! A utility module for working with threads that automatically joins threads upon drop +//! and abstracts over operating system quality of service (`QoS`) APIs +//! through the concept of a "thread intent". +//! +//! The intent of a thread is frozen at thread creation time, +//! i.e. there is no API to change the intent of a thread once it has been spawned. +//! +//! As a system, rust-analyzer should have the property that +//! old manual scheduling APIs are replaced entirely by `QoS`. +//! To maintain this invariant, we panic when it is clear that +//! old scheduling APIs have been used. +//! +//! Moreover, we also want to ensure that every thread has an intent set explicitly +//! to force a decision about its importance to the system. +//! Thus, [`ThreadIntent`] has no default value +//! and every entry point to creating a thread requires a [`ThreadIntent`] upfront. + +use std::fmt; + +pub use crate::intent::ThreadIntent; +pub use crate::pool::Pool; + +mod intent; +mod pool; + +/// # Panics +/// +/// Panics if failed to spawn the thread. +pub fn spawn(intent: ThreadIntent, name: String, f: F) -> JoinHandle +where + F: (FnOnce() -> T) + Send + 'static, + T: Send + 'static, +{ + Builder::new(intent, name) + .spawn(f) + .expect("failed to spawn thread") +} + +pub struct Builder { + intent: ThreadIntent, + inner: jod_thread::Builder, + allow_leak: bool, +} + +impl Builder { + #[must_use] + pub fn new(intent: ThreadIntent, name: impl Into) -> Self { + Self { + intent, + inner: jod_thread::Builder::new().name(name.into()), + allow_leak: false, + } + } + + #[must_use] + pub fn stack_size(self, size: usize) -> Self { + Self { + inner: self.inner.stack_size(size), + ..self + } + } + + /// Whether dropping should detach the thread + /// instead of joining it. + #[must_use] + pub fn allow_leak(self, allow_leak: bool) -> Self { + Self { allow_leak, ..self } + } + + pub fn spawn(self, f: F) -> std::io::Result> + where + F: (FnOnce() -> T) + Send + 'static, + T: Send + 'static, + { + let inner_handle = self.inner.spawn(move || { + self.intent.apply_to_current_thread(); + f() + })?; + + Ok(JoinHandle { + inner: Some(inner_handle), + allow_leak: self.allow_leak, + }) + } +} + +pub struct JoinHandle { + // `inner` is an `Option` so that we can + // take ownership of the contained `JoinHandle`. + inner: Option>, + allow_leak: bool, +} + +impl JoinHandle { + /// # Panics + /// + /// Panics if there is no thread to join. + #[must_use] + pub fn join(mut self) -> T { + self.inner.take().unwrap().join() + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if !self.allow_leak { + return; + } + + if let Some(join_handle) = self.inner.take() { + join_handle.detach(); + } + } +} + +impl fmt::Debug for JoinHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("JoinHandle { .. }") + } +} diff --git a/crates/squawk_thread/src/pool.rs b/crates/squawk_thread/src/pool.rs new file mode 100644 index 00000000..a2b08334 --- /dev/null +++ b/crates/squawk_thread/src/pool.rs @@ -0,0 +1,189 @@ +//! [`Pool`] implements a basic custom thread pool +//! inspired by the [`threadpool` crate](http://docs.rs/threadpool). +//! When you spawn a task you specify a thread intent +//! so the pool can schedule it to run on a thread with that intent. +//! rust-analyzer uses this to prioritize work based on latency requirements. +//! +//! The thread pool is implemented entirely using +//! the threading utilities in [`crate`]. + +use std::{ + marker::PhantomData, + num::NonZeroUsize, + panic::{AssertUnwindSafe, UnwindSafe}, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use crossbeam_channel::{Receiver, Sender}; +use crossbeam_utils::sync::WaitGroup; + +use crate::{Builder, JoinHandle, ThreadIntent}; + +pub struct Pool { + // `_handles` is never read: the field is present + // only for its `Drop` impl. + + // The worker threads exit once the channel closes; + // make sure to keep `job_sender` above `handles` + // so that the channel is actually closed + // before we join the worker threads! + job_sender: Sender, + _handles: Box<[JoinHandle]>, + extant_tasks: Arc, +} + +struct Job { + requested_intent: ThreadIntent, + f: Box, +} + +impl Pool { + /// # Panics + /// + /// Panics if job panics + #[must_use] + pub fn new(threads: NonZeroUsize) -> Self { + const STACK_SIZE: usize = 8 * 1024 * 1024; + const INITIAL_INTENT: ThreadIntent = ThreadIntent::Worker; + + let (job_sender, job_receiver) = crossbeam_channel::unbounded(); + let extant_tasks = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::with_capacity(threads.into()); + for idx in 0..threads.into() { + let handle = Builder::new(INITIAL_INTENT, format!("squawk:worker:{idx}",)) + .stack_size(STACK_SIZE) + .allow_leak(true) + .spawn({ + let extant_tasks = Arc::clone(&extant_tasks); + let job_receiver: Receiver = job_receiver.clone(); + move || { + let mut current_intent = INITIAL_INTENT; + for job in job_receiver { + if job.requested_intent != current_intent { + job.requested_intent.apply_to_current_thread(); + current_intent = job.requested_intent; + } + extant_tasks.fetch_add(1, Ordering::SeqCst); + + // SAFETY: it's safe to assume that `job.f` is unwind safe because we always + // abort the process if it panics. + // Panicking here ensures that we don't swallow errors and is the same as + // what rayon does. + // Any recovery should be implemented outside the thread pool (e.g. when + // dispatching requests/notifications etc). + if let Err(error) = std::panic::catch_unwind(AssertUnwindSafe(job.f)) { + if let Some(msg) = error.downcast_ref::() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else if let Some(msg) = error.downcast_ref::<&str>() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else if let Some(cancelled) = + error.downcast_ref::() + { + tracing::error!( + "Worker thread got cancelled: {cancelled}; aborting" + ); + } else { + tracing::error!( + "Worker thread panicked with: {error:?}; aborting" + ); + } + + std::process::abort(); + } + + extant_tasks.fetch_sub(1, Ordering::SeqCst); + } + } + }) + .expect("failed to spawn thread"); + + handles.push(handle); + } + + Self { + _handles: handles.into_boxed_slice(), + extant_tasks, + job_sender, + } + } + + pub fn spawn(&self, intent: ThreadIntent, f: F) + where + F: FnOnce() + Send + /* UnwindSafe + */ 'static, + { + let f = Box::new(move || { + if cfg!(debug_assertions) { + intent.assert_is_used_on_current_thread(); + } + f(); + }); + + let job = Job { + requested_intent: intent, + f, + }; + self.job_sender.send(job).unwrap(); + } + + pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R + where + F: FnOnce(&Scope<'pool, 'scope>) -> R, + { + let wg = WaitGroup::new(); + let scope = Scope { + pool: self, + wg, + _marker: PhantomData, + }; + let r = f(&scope); + scope.wg.wait(); + r + } + + #[must_use] + pub fn len(&self) -> usize { + self.extant_tasks.load(Ordering::SeqCst) + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +pub struct Scope<'pool, 'scope> { + pool: &'pool Pool, + wg: WaitGroup, + _marker: PhantomData &'scope ()>, +} + +impl<'scope> Scope<'_, 'scope> { + pub fn spawn(&self, intent: ThreadIntent, f: F) + where + F: 'scope + FnOnce() + Send + UnwindSafe, + { + let wg = self.wg.clone(); + let f = Box::new(move || { + if cfg!(debug_assertions) { + intent.assert_is_used_on_current_thread(); + } + f(); + drop(wg); + }); + + let job = Job { + requested_intent: intent, + f: unsafe { + std::mem::transmute::< + Box, + Box, + >(f) + }, + }; + self.pool.job_sender.send(job).unwrap(); + } +}