diff --git a/Cargo.toml b/Cargo.toml index 4c19b6ec..a4f364e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "src/health_monitoring_lib", "examples/rust_supervised_app", ] +default-members = ["src/health_monitoring_lib"] [workspace.package] edition = "2021" diff --git a/src/health_monitoring_lib/BUILD b/src/health_monitoring_lib/BUILD index c473903d..69533160 100644 --- a/src/health_monitoring_lib/BUILD +++ b/src/health_monitoring_lib/BUILD @@ -42,6 +42,7 @@ CC_HDRS = [ rust_library( name = "health_monitoring_lib", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], crate_root = "rust/lib.rs", proc_macro_deps = PROC_MACRO_DEPS, visibility = ["//visibility:public"], @@ -65,6 +66,7 @@ cc_library( rust_static_library( name = "health_monitoring_lib_ffi", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], crate_name = "health_monitoring_lib", crate_root = "rust/lib.rs", proc_macro_deps = [ @@ -100,6 +102,7 @@ cc_library( rust_test( name = "tests", crate = ":health_monitoring_lib", + crate_features = ["stub_supervisor_api_client"], rustc_flags = [ "-C", "link-arg=-lm", diff --git a/src/health_monitoring_lib/Cargo.toml b/src/health_monitoring_lib/Cargo.toml index 71b38d46..073a234f 100644 --- a/src/health_monitoring_lib/Cargo.toml +++ b/src/health_monitoring_lib/Cargo.toml @@ -7,7 +7,6 @@ edition.workspace = true authors.workspace = true license-file.workspace = true - [lib] path = "rust/lib.rs" @@ -18,11 +17,12 @@ workspace = true score_log.workspace = true score_testing_macros.workspace = true containers.workspace = true -monitor_rs.workspace = true +monitor_rs = { workspace = true, optional = true } [dev-dependencies] stdout_logger.workspace = true [features] -default = [] +default = ["stub_supervisor_api_client"] stub_supervisor_api_client = [] +score_supervisor_api_client = ["monitor_rs"] diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 35a8ffa5..8042a391 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -39,6 +39,9 @@ pub(crate) enum MonitorEvaluationError { /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. pub(crate) trait MonitorEvaluator { + /// Run monitor evaluation. + /// + /// - `on_error` - error handling, containing tag of failing object and error code. fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)); } diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 492e6378..05db89a5 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -15,6 +15,7 @@ mod common; mod ffi; mod log; mod protected_memory; +mod supervisor_api_client; mod tag; mod worker; @@ -217,12 +218,13 @@ impl HealthMonitor { let monitoring_logic = worker::MonitoringLogic::new( monitors, self.supervisor_api_cycle, - // Currently only `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. - // The later is meant to be used for testing purposes. - #[cfg(not(any(test, feature = "stub_supervisor_api_client")))] - worker::ScoreSupervisorAPIClient::new(), - #[cfg(any(test, feature = "stub_supervisor_api_client"))] - worker::StubSupervisorAPIClient {}, + #[cfg(all(not(test), feature = "score_supervisor_api_client"))] + supervisor_api_client::score_supervisor_api_client::ScoreSupervisorAPIClient::new(), + #[cfg(any( + test, + all(feature = "stub_supervisor_api_client", not(feature = "score_supervisor_api_client")) + ))] + supervisor_api_client::stub_supervisor_api_client::StubSupervisorAPIClient::new(), ); self.worker.start(monitoring_logic) diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs new file mode 100644 index 00000000..195d167a --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs @@ -0,0 +1,28 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Module providing [`SupervisorAPIClient`] implementations. +//! Currently `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. +//! The latter is meant for testing purposes. + +/// An abstraction over the API used to notify the supervisor about process liveness. +pub trait SupervisorAPIClient { + fn notify_alive(&self); +} + +// NOTE: various implementations are not mutually exclusive. + +#[cfg(feature = "score_supervisor_api_client")] +pub mod score_supervisor_api_client; +#[cfg(feature = "stub_supervisor_api_client")] +pub mod stub_supervisor_api_client; diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs new file mode 100644 index 00000000..a198f9ad --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs @@ -0,0 +1,40 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +#![allow(dead_code)] + +use crate::log::debug; +use crate::supervisor_api_client::SupervisorAPIClient; +use crate::worker::Checks; + +pub struct ScoreSupervisorAPIClient { + supervisor_link: monitor_rs::Monitor, +} + +unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution + +impl ScoreSupervisorAPIClient { + pub fn new() -> Self { + let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); + debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); + // This is only temporary usage so unwrap is fine here. + let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); + Self { supervisor_link } + } +} + +impl SupervisorAPIClient for ScoreSupervisorAPIClient { + fn notify_alive(&self) { + self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); + } +} diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs new file mode 100644 index 00000000..e98f4909 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs @@ -0,0 +1,32 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +#![allow(dead_code)] + +use crate::log::warn; +use crate::supervisor_api_client::SupervisorAPIClient; + +/// A stub implementation of the SupervisorAPIClient that logs alive notifications. +pub struct StubSupervisorAPIClient; + +impl StubSupervisorAPIClient { + pub fn new() -> Self { + Self + } +} + +impl SupervisorAPIClient for StubSupervisorAPIClient { + fn notify_alive(&self) { + warn!("StubSupervisorAPIClient: notify_alive called"); + } +} diff --git a/src/health_monitoring_lib/rust/tag.rs b/src/health_monitoring_lib/rust/tag.rs index eca868df..204902b7 100644 --- a/src/health_monitoring_lib/rust/tag.rs +++ b/src/health_monitoring_lib/rust/tag.rs @@ -247,13 +247,41 @@ mod tests { } #[test] - fn tag_hash() { - let example_str = "EXAMPLE"; - let tag = Tag::from(example_str.to_string()); - let mut hasher = DefaultHasher::new(); - tag.hash(&mut hasher); - let hash = hasher.finish(); - assert_eq!(hash, 14738755424381306335); + fn tag_hash_is_eq() { + let tag1 = Tag::from("same"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("same"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_eq!(hash1, hash2); + } + + #[test] + fn tag_hash_is_ne() { + let tag1 = Tag::from("first"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("second"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_ne!(hash1, hash2); } #[test] diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index 8830e153..7be07321 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -11,19 +11,19 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* use crate::common::{MonitorEvalHandle, MonitorEvaluator}; -use crate::log::{debug, info, warn}; +use crate::log::{info, warn}; +use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; - -/// An abstraction over the API used to notify the supervisor about process liveness. -pub(super) trait SupervisorAPIClient { - fn notify_alive(&self); -} +use core::sync::atomic::{AtomicBool, Ordering}; +use core::time::Duration; +use std::sync::Arc; +use std::time::Instant; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, client: T, - last_notification: std::time::Instant, - supervisor_api_cycle: core::time::Duration, + last_notification: Instant, + supervisor_api_cycle: Duration, } impl MonitoringLogic { @@ -34,14 +34,14 @@ impl MonitoringLogic { /// * `client` - An implementation of the SupervisorAPIClient trait. pub(super) fn new( monitors: FixedCapacityVec, - supervisor_api_cycle: core::time::Duration, + supervisor_api_cycle: Duration, client: T, ) -> Self { Self { monitors, client, supervisor_api_cycle, - last_notification: std::time::Instant::now(), + last_notification: Instant::now(), } } @@ -58,7 +58,7 @@ impl MonitoringLogic { if !has_any_error { if self.last_notification.elapsed() > self.supervisor_api_cycle { - self.last_notification = std::time::Instant::now(); + self.last_notification = Instant::now(); self.client.notify_alive(); } } else { @@ -73,15 +73,15 @@ impl MonitoringLogic { /// A struct that manages a unique thread for running monitoring logic periodically. pub struct UniqueThreadRunner { handle: Option>, - should_stop: std::sync::Arc, - internal_duration_cycle: core::time::Duration, + should_stop: Arc, + internal_duration_cycle: Duration, } impl UniqueThreadRunner { - pub(super) fn new(internal_duration_cycle: core::time::Duration) -> Self { + pub(super) fn new(internal_duration_cycle: Duration) -> Self { Self { handle: None, - should_stop: std::sync::Arc::new(core::sync::atomic::AtomicBool::new(false)), + should_stop: Arc::new(AtomicBool::new(false)), internal_duration_cycle, } } @@ -99,10 +99,10 @@ impl UniqueThreadRunner { let mut next_sleep_time = interval; // TODO Add some checks and log if cyclicly here is not met. - while !should_stop.load(core::sync::atomic::Ordering::Relaxed) { + while !should_stop.load(Ordering::Relaxed) { std::thread::sleep(next_sleep_time); - let now = std::time::Instant::now(); + let now = Instant::now(); if !monitoring_logic.run() { info!("Monitoring logic failed, stopping thread."); @@ -118,7 +118,7 @@ impl UniqueThreadRunner { } pub fn join(&mut self) { - self.should_stop.store(true, core::sync::atomic::Ordering::Relaxed); + self.should_stop.store(true, Ordering::Relaxed); if let Some(handle) = self.handle.take() { let _ = handle.join(); } @@ -131,20 +131,9 @@ impl Drop for UniqueThreadRunner { } } -/// A stub implementation of the SupervisorAPIClient that logs alive notifications. -#[allow(dead_code)] -pub(super) struct StubSupervisorAPIClient; - -#[allow(dead_code)] -impl SupervisorAPIClient for StubSupervisorAPIClient { - fn notify_alive(&self) { - warn!("StubSupervisorAPIClient: notify_alive called"); - } -} - #[allow(dead_code)] #[derive(Copy, Clone)] -enum Checks { +pub(crate) enum Checks { WorkerCheckpoint, } @@ -156,59 +145,40 @@ impl From for u32 { } } -#[allow(dead_code)] -pub(super) struct ScoreSupervisorAPIClient { - supervisor_link: monitor_rs::Monitor, -} - -unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution - -#[allow(dead_code)] -impl ScoreSupervisorAPIClient { - pub fn new() -> Self { - let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); - debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); - // This is only temporary usage so unwrap is fine here. - let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); - Self { supervisor_link } - } -} -impl SupervisorAPIClient for ScoreSupervisorAPIClient { - fn notify_alive(&self) { - self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); - } -} - #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; use crate::protected_memory::ProtectedMemoryAllocator; + use crate::supervisor_api_client::SupervisorAPIClient; use crate::tag::{DeadlineTag, MonitorTag}; + use crate::worker::{MonitoringLogic, UniqueThreadRunner}; use crate::TimeRange; - - use super::*; + use containers::fixed_capacity::FixedCapacityVec; + use core::sync::atomic::{AtomicUsize, Ordering}; + use core::time::Duration; + use std::sync::Arc; #[derive(Clone)] struct MockSupervisorAPIClient { - pub notify_called: std::sync::Arc, + pub notify_called: Arc, } impl MockSupervisorAPIClient { pub fn new() -> Self { Self { - notify_called: std::sync::Arc::new(core::sync::atomic::AtomicUsize::new(0)), + notify_called: Arc::new(AtomicUsize::new(0)), } } fn get_notify_count(&self) -> usize { - self.notify_called.load(core::sync::atomic::Ordering::Acquire) + self.notify_called.load(Ordering::Acquire) } } impl SupervisorAPIClient for MockSupervisorAPIClient { fn notify_alive(&self) { - self.notify_called.fetch_add(1, core::sync::atomic::Ordering::AcqRel); + self.notify_called.fetch_add(1, Ordering::AcqRel); } } @@ -218,14 +188,11 @@ mod tests { DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("deadline_long"), - TimeRange::new(core::time::Duration::from_secs(1), core::time::Duration::from_secs(50)), + TimeRange::new(Duration::from_secs(1), Duration::from_secs(50)), ) .add_deadline( DeadlineTag::from("deadline_fast"), - TimeRange::new( - core::time::Duration::from_millis(0), - core::time::Duration::from_millis(50), - ), + TimeRange::new(Duration::from_millis(0), Duration::from_millis(50)), ) .build(monitor_tag, &allocator) } @@ -241,7 +208,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_secs(1), + Duration::from_secs(1), alive_mock.clone(), ); @@ -267,7 +234,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); @@ -296,7 +263,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_millis(30), + Duration::from_millis(30), alive_mock.clone(), ); @@ -305,19 +272,19 @@ mod tests { .unwrap(); let _handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); assert_eq!(alive_mock.get_notify_count(), 5); @@ -335,11 +302,11 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); - let mut worker = UniqueThreadRunner::new(core::time::Duration::from_millis(10)); + let mut worker = UniqueThreadRunner::new(Duration::from_millis(10)); worker.start(logic); let mut deadline = deadline_monitor @@ -348,7 +315,7 @@ mod tests { let handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(70)); + std::thread::sleep(Duration::from_millis(70)); let current_count = alive_mock.get_notify_count(); assert!( @@ -358,7 +325,7 @@ mod tests { ); // We shall not get any new alive calls. - std::thread::sleep(core::time::Duration::from_millis(50)); + std::thread::sleep(Duration::from_millis(50)); assert_eq!(alive_mock.get_notify_count(), current_count); handle.stop(); }