From 253527734d423897ce21f924e37badf1403003c8 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 16 Oct 2025 16:34:22 -0300 Subject: [PATCH 1/4] Added spawn_on_thread and send_message_on --- concurrency/src/tasks/gen_server.rs | 67 +++++++++++++++++++++++++++-- concurrency/src/tasks/mod.rs | 4 +- examples/blocking_genserver/main.rs | 2 +- 3 files changed, 67 insertions(+), 6 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index d516b1d..809d861 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -5,7 +5,10 @@ use crate::{ tasks::InitResult::{NoSuccess, Success}, }; use futures::future::FutureExt as _; -use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; +use spawned_rt::{ + tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle}, + threads, +}; use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); @@ -27,7 +30,7 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(gen_server: G) -> Self { + fn new(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -51,7 +54,7 @@ impl GenServerHandle { handle_clone } - pub(crate) fn new_blocking(gen_server: G) -> Self { + fn new_blocking(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -70,6 +73,25 @@ impl GenServerHandle { handle_clone } + fn new_on_thread(gen_server: G) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; + let handle_clone = handle.clone(); + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = threads::spawn(|| { + threads::block_on(async move { + if let Err(error) = gen_server.run(&handle, &mut rx).await { + tracing::trace!(%error, "GenServer crashed") + }; + }) + }); + handle_clone + } + pub fn sender(&self) -> mpsc::Sender> { self.tx.clone() } @@ -153,6 +175,15 @@ pub trait GenServer: Send + Sized { GenServerHandle::new_blocking(self) } + /// For some "singleton" GenServers that run througout the whole execution of the + /// program, it makes sense to run in their own dedicated thread to avoid interference + /// with the rest of the tasks' runtime. + /// The use of tokio::task::spawm_blocking is not recommended for these scenarios + /// as it is a limited thread pool better suited for blocking IO tasks that eventually end + fn start_on_thread(self) -> GenServerHandle { + GenServerHandle::new_on_thread(self) + } + fn run( self, handle: &GenServerHandle, @@ -300,6 +331,36 @@ pub trait GenServer: Send + Sized { } } +/// Spawns a task that awaits on a future and sends messages to a GenServer. +/// +/// This function returns a handle to the spawned task. +pub fn send_message_on( + handle: GenServerHandle, + future: U, + message: T::CastMsg, +) -> JoinHandle<()> +where + T: GenServer, + U: Future + Send + 'static, + ::Output: Send, +{ + let cancelation_token = handle.cancellation_token(); + let mut handle_clone = handle.clone(); + let join_handle = spawned_rt::tasks::spawn(async move { + tracing::info!("Ctrl+C listener started"); + let is_cancelled = core::pin::pin!(cancelation_token.cancelled()); + let signal = core::pin::pin!(future); + match futures::future::select(is_cancelled, signal).await { + futures::future::Either::Left(_) => tracing::error!("GenServer stopped"), + futures::future::Either::Right(_) => { + tracing::info!("Sending shutdown to PeerTable Server"); + handle_clone.cast(message).await.unwrap(); + } + } + }); + join_handle +} + #[cfg(debug_assertions)] mod warn_on_block { use super::*; diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 65ce84b..6936162 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -12,8 +12,8 @@ mod stream_tests; mod timer_tests; pub use gen_server::{ - CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult, - InitResult::NoSuccess, InitResult::Success, + send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, + InitResult, InitResult::NoSuccess, InitResult::Success, }; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index ca954a7..981f5ab 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -99,7 +99,7 @@ impl GenServer for WellBehavedTask { pub fn main() { rt::run(async move { // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::new().start_blocking(); + let mut badboy = BadlyBehavedTask::new().start_on_thread(); let _ = badboy.cast(()).await; let mut goodboy = WellBehavedTask::new(0).start(); let _ = goodboy.cast(()).await; From 7102a6c3fbcbf74d9264ead53cc9ffdd4eb71dd2 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 16 Oct 2025 17:07:02 -0300 Subject: [PATCH 2/4] Improved error handling --- concurrency/src/tasks/gen_server.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 809d861..9415d7a 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -4,7 +4,8 @@ use crate::{ error::GenServerError, tasks::InitResult::{NoSuccess, Success}, }; -use futures::future::FutureExt as _; +use core::pin::pin; +use futures::future::{self, FutureExt as _}; use spawned_rt::{ tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle}, threads, @@ -346,15 +347,15 @@ where { let cancelation_token = handle.cancellation_token(); let mut handle_clone = handle.clone(); - let join_handle = spawned_rt::tasks::spawn(async move { - tracing::info!("Ctrl+C listener started"); - let is_cancelled = core::pin::pin!(cancelation_token.cancelled()); - let signal = core::pin::pin!(future); - match futures::future::select(is_cancelled, signal).await { - futures::future::Either::Left(_) => tracing::error!("GenServer stopped"), - futures::future::Either::Right(_) => { - tracing::info!("Sending shutdown to PeerTable Server"); - handle_clone.cast(message).await.unwrap(); + let join_handle = rt::spawn(async move { + let is_cancelled = pin!(cancelation_token.cancelled()); + let signal = pin!(future); + match future::select(is_cancelled, signal).await { + future::Either::Left(_) => tracing::error!("GenServer stopped"), + future::Either::Right(_) => { + if let Err(e) = handle_clone.cast(message).await { + tracing::error!("Failed to send message: {e:?}") + } } } }); From 59bca89057c5c52b4c0d45ec4e57fd731f5fa0ec Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 16 Oct 2025 17:14:30 -0300 Subject: [PATCH 3/4] Corrected comment --- concurrency/src/tasks/gen_server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 9415d7a..7a54957 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -332,8 +332,8 @@ pub trait GenServer: Send + Sized { } } -/// Spawns a task that awaits on a future and sends messages to a GenServer. -/// +/// Spawns a task that awaits on a future and sends a message to a GenServer +/// on completion. /// This function returns a handle to the spawned task. pub fn send_message_on( handle: GenServerHandle, From 22a50386ab2052f60f104511ef6fa3b2969d1f0d Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Fri, 17 Oct 2025 12:28:54 -0300 Subject: [PATCH 4/4] Moved a log message from error to debug and updated version --- Cargo.lock | 4 ++-- Cargo.toml | 6 +++--- concurrency/src/tasks/gen_server.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9eb9ad..efef144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.4.1" +version = "0.4.2" dependencies = [ "futures", "pin-project-lite", @@ -1210,7 +1210,7 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.4.1" +version = "0.4.2" dependencies = [ "crossbeam", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 2b55e5e..667b687 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,12 @@ members = [ ] [workspace.dependencies] -spawned-rt = { path = "rt", version = "0.4.1" } -spawned-concurrency = { path = "concurrency", version = "0.4.1" } +spawned-rt = { path = "rt", version = "0.4.2" } +spawned-concurrency = { path = "concurrency", version = "0.4.2" } tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } [workspace.package] -version = "0.4.1" +version = "0.4.2" license = "MIT" edition = "2021" diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 7a54957..15108a1 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -351,7 +351,7 @@ where let is_cancelled = pin!(cancelation_token.cancelled()); let signal = pin!(future); match future::select(is_cancelled, signal).await { - future::Either::Left(_) => tracing::error!("GenServer stopped"), + future::Either::Left(_) => tracing::debug!("GenServer stopped"), future::Either::Right(_) => { if let Err(e) = handle_clone.cast(message).await { tracing::error!("Failed to send message: {e:?}")