From 3cb438ac95b5356ba4352b3c9f3fd9216152a779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Tue, 24 Feb 2026 10:35:07 +0100 Subject: [PATCH 1/5] hmon: fix Cargo build, small code changes - Fix Cargo for `health_monitoring_lib`. - Fix leftover test changes from previous PR. - Small code changes. --- .github/workflows/lint_clippy.yml | 10 ++- .vscode/settings.json | 3 + Cargo.toml | 1 + src/health_monitoring_lib/BUILD | 3 + src/health_monitoring_lib/Cargo.toml | 6 +- src/health_monitoring_lib/rust/lib.rs | 14 +-- .../rust/supervisor_api_client/mod.rs | 38 ++++++++ .../score_supervisor_api_client.rs | 40 +++++++++ .../stub_supervisor_api_client.rs | 32 +++++++ src/health_monitoring_lib/rust/tag.rs | 42 +++++++-- src/health_monitoring_lib/rust/worker.rs | 90 ++++++------------- 11 files changed, 193 insertions(+), 86 deletions(-) create mode 100644 src/health_monitoring_lib/rust/supervisor_api_client/mod.rs create mode 100644 src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs create mode 100644 src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs diff --git a/.github/workflows/lint_clippy.yml b/.github/workflows/lint_clippy.yml index 6070de95..44a66910 100644 --- a/.github/workflows/lint_clippy.yml +++ b/.github/workflows/lint_clippy.yml @@ -48,8 +48,14 @@ jobs: override: true components: clippy - - name: check clippy errors + - name: check clippy errors (with "--features stub_supervisor_api_client") uses: actions-rs/cargo@v1 with: command: clippy - args: --all-features --all-targets --workspace -- -D warnings + args: --features stub_supervisor_api_client --all-targets -- -D warnings + + - name: check clippy errors (with "--features score_supervisor_api_client") + uses: actions-rs/cargo@v1 + with: + command: clippy + args: --features score_supervisor_api_client --no-default-features --all-targets -- -D warnings diff --git a/.vscode/settings.json b/.vscode/settings.json index 30718b6f..4761cb3f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,6 +9,9 @@ "rust-analyzer.cargo.cfgs": [ "!miri" ], + "rust-analyzer.cargo.features": [ + "stub_supervisor_api_client" + ], "rust-analyzer.check.command": "clippy", "rust-analyzer.rustfmt.overrideCommand": [ "${workspaceFolder}/.vscode/rustfmt.sh" diff --git a/Cargo.toml b/Cargo.toml index 4c19b6ec..a4f364e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "src/health_monitoring_lib", "examples/rust_supervised_app", ] +default-members = ["src/health_monitoring_lib"] [workspace.package] edition = "2021" diff --git a/src/health_monitoring_lib/BUILD b/src/health_monitoring_lib/BUILD index c473903d..69533160 100644 --- a/src/health_monitoring_lib/BUILD +++ b/src/health_monitoring_lib/BUILD @@ -42,6 +42,7 @@ CC_HDRS = [ rust_library( name = "health_monitoring_lib", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], crate_root = "rust/lib.rs", proc_macro_deps = PROC_MACRO_DEPS, visibility = ["//visibility:public"], @@ -65,6 +66,7 @@ cc_library( rust_static_library( name = "health_monitoring_lib_ffi", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], crate_name = "health_monitoring_lib", crate_root = "rust/lib.rs", proc_macro_deps = [ @@ -100,6 +102,7 @@ cc_library( rust_test( name = "tests", crate = ":health_monitoring_lib", + crate_features = ["stub_supervisor_api_client"], rustc_flags = [ "-C", "link-arg=-lm", diff --git a/src/health_monitoring_lib/Cargo.toml b/src/health_monitoring_lib/Cargo.toml index 71b38d46..073a234f 100644 --- a/src/health_monitoring_lib/Cargo.toml +++ b/src/health_monitoring_lib/Cargo.toml @@ -7,7 +7,6 @@ edition.workspace = true authors.workspace = true license-file.workspace = true - [lib] path = "rust/lib.rs" @@ -18,11 +17,12 @@ workspace = true score_log.workspace = true score_testing_macros.workspace = true containers.workspace = true -monitor_rs.workspace = true +monitor_rs = { workspace = true, optional = true } [dev-dependencies] stdout_logger.workspace = true [features] -default = [] +default = ["stub_supervisor_api_client"] stub_supervisor_api_client = [] +score_supervisor_api_client = ["monitor_rs"] diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 492e6378..844183d2 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -15,12 +15,14 @@ mod common; mod ffi; mod log; mod protected_memory; +mod supervisor_api_client; mod tag; mod worker; pub mod deadline; use crate::common::MonitorEvalHandle; +use crate::supervisor_api_client::SupervisorAPIClientImpl; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; @@ -214,16 +216,8 @@ impl HealthMonitor { } pub(crate) fn start_internal(&mut self, monitors: FixedCapacityVec) { - let monitoring_logic = worker::MonitoringLogic::new( - monitors, - self.supervisor_api_cycle, - // Currently only `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. - // The later is meant to be used for testing purposes. - #[cfg(not(any(test, feature = "stub_supervisor_api_client")))] - worker::ScoreSupervisorAPIClient::new(), - #[cfg(any(test, feature = "stub_supervisor_api_client"))] - worker::StubSupervisorAPIClient {}, - ); + let monitoring_logic = + worker::MonitoringLogic::new(monitors, self.supervisor_api_cycle, SupervisorAPIClientImpl::new()); self.worker.start(monitoring_logic) } diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs new file mode 100644 index 00000000..095e3d01 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs @@ -0,0 +1,38 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Module for selecting [`SupervisorAPIClient`] implementation. +//! Currently `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. +//! The latter is meant for testing purposes. + +/// An abstraction over the API used to notify the supervisor about process liveness. +pub trait SupervisorAPIClient { + fn notify_alive(&self); +} + +// Disallow both and none features. +#[cfg(any( + all(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client"), + not(any(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client")) +))] +compile_error!("Either 'score_supervisor_api_client' or 'stub_supervisor_api_client' must be enabled!"); + +#[cfg(feature = "score_supervisor_api_client")] +mod score_supervisor_api_client; +#[cfg(feature = "stub_supervisor_api_client")] +mod stub_supervisor_api_client; + +#[cfg(feature = "score_supervisor_api_client")] +pub use score_supervisor_api_client::ScoreSupervisorAPIClient as SupervisorAPIClientImpl; +#[cfg(feature = "stub_supervisor_api_client")] +pub use stub_supervisor_api_client::StubSupervisorAPIClient as SupervisorAPIClientImpl; diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs new file mode 100644 index 00000000..73872372 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs @@ -0,0 +1,40 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::log::debug; +use crate::supervisor_api_client::SupervisorAPIClient; +use crate::worker::Checks; + +#[allow(dead_code)] +pub struct ScoreSupervisorAPIClient { + supervisor_link: monitor_rs::Monitor, +} + +unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution + +#[allow(dead_code)] +impl ScoreSupervisorAPIClient { + pub fn new() -> Self { + let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); + debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); + // This is only temporary usage so unwrap is fine here. + let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); + Self { supervisor_link } + } +} + +impl SupervisorAPIClient for ScoreSupervisorAPIClient { + fn notify_alive(&self) { + self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); + } +} diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs new file mode 100644 index 00000000..a948c819 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs @@ -0,0 +1,32 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::log::warn; +use crate::supervisor_api_client::SupervisorAPIClient; + +/// A stub implementation of the SupervisorAPIClient that logs alive notifications. +#[allow(dead_code)] +pub struct StubSupervisorAPIClient; + +impl StubSupervisorAPIClient { + pub fn new() -> Self { + Self + } +} + +#[allow(dead_code)] +impl SupervisorAPIClient for StubSupervisorAPIClient { + fn notify_alive(&self) { + warn!("StubSupervisorAPIClient: notify_alive called"); + } +} diff --git a/src/health_monitoring_lib/rust/tag.rs b/src/health_monitoring_lib/rust/tag.rs index eca868df..204902b7 100644 --- a/src/health_monitoring_lib/rust/tag.rs +++ b/src/health_monitoring_lib/rust/tag.rs @@ -247,13 +247,41 @@ mod tests { } #[test] - fn tag_hash() { - let example_str = "EXAMPLE"; - let tag = Tag::from(example_str.to_string()); - let mut hasher = DefaultHasher::new(); - tag.hash(&mut hasher); - let hash = hasher.finish(); - assert_eq!(hash, 14738755424381306335); + fn tag_hash_is_eq() { + let tag1 = Tag::from("same"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("same"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_eq!(hash1, hash2); + } + + #[test] + fn tag_hash_is_ne() { + let tag1 = Tag::from("first"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("second"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_ne!(hash1, hash2); } #[test] diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index 8830e153..e5b4c0ac 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -11,19 +11,16 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* use crate::common::{MonitorEvalHandle, MonitorEvaluator}; -use crate::log::{debug, info, warn}; +use crate::log::{info, warn}; +use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; - -/// An abstraction over the API used to notify the supervisor about process liveness. -pub(super) trait SupervisorAPIClient { - fn notify_alive(&self); -} +use core::time::Duration; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, client: T, last_notification: std::time::Instant, - supervisor_api_cycle: core::time::Duration, + supervisor_api_cycle: Duration, } impl MonitoringLogic { @@ -34,7 +31,7 @@ impl MonitoringLogic { /// * `client` - An implementation of the SupervisorAPIClient trait. pub(super) fn new( monitors: FixedCapacityVec, - supervisor_api_cycle: core::time::Duration, + supervisor_api_cycle: Duration, client: T, ) -> Self { Self { @@ -74,11 +71,11 @@ impl MonitoringLogic { pub struct UniqueThreadRunner { handle: Option>, should_stop: std::sync::Arc, - internal_duration_cycle: core::time::Duration, + internal_duration_cycle: Duration, } impl UniqueThreadRunner { - pub(super) fn new(internal_duration_cycle: core::time::Duration) -> Self { + pub(super) fn new(internal_duration_cycle: Duration) -> Self { Self { handle: None, should_stop: std::sync::Arc::new(core::sync::atomic::AtomicBool::new(false)), @@ -131,20 +128,9 @@ impl Drop for UniqueThreadRunner { } } -/// A stub implementation of the SupervisorAPIClient that logs alive notifications. -#[allow(dead_code)] -pub(super) struct StubSupervisorAPIClient; - -#[allow(dead_code)] -impl SupervisorAPIClient for StubSupervisorAPIClient { - fn notify_alive(&self) { - warn!("StubSupervisorAPIClient: notify_alive called"); - } -} - #[allow(dead_code)] #[derive(Copy, Clone)] -enum Checks { +pub(crate) enum Checks { WorkerCheckpoint, } @@ -156,38 +142,17 @@ impl From for u32 { } } -#[allow(dead_code)] -pub(super) struct ScoreSupervisorAPIClient { - supervisor_link: monitor_rs::Monitor, -} - -unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution - -#[allow(dead_code)] -impl ScoreSupervisorAPIClient { - pub fn new() -> Self { - let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); - debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); - // This is only temporary usage so unwrap is fine here. - let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); - Self { supervisor_link } - } -} -impl SupervisorAPIClient for ScoreSupervisorAPIClient { - fn notify_alive(&self) { - self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); - } -} - #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; use crate::protected_memory::ProtectedMemoryAllocator; + use crate::supervisor_api_client::SupervisorAPIClient; use crate::tag::{DeadlineTag, MonitorTag}; + use crate::worker::{MonitoringLogic, UniqueThreadRunner}; use crate::TimeRange; - - use super::*; + use containers::fixed_capacity::FixedCapacityVec; + use core::time::Duration; #[derive(Clone)] struct MockSupervisorAPIClient { @@ -218,14 +183,11 @@ mod tests { DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("deadline_long"), - TimeRange::new(core::time::Duration::from_secs(1), core::time::Duration::from_secs(50)), + TimeRange::new(Duration::from_secs(1), Duration::from_secs(50)), ) .add_deadline( DeadlineTag::from("deadline_fast"), - TimeRange::new( - core::time::Duration::from_millis(0), - core::time::Duration::from_millis(50), - ), + TimeRange::new(Duration::from_millis(0), Duration::from_millis(50)), ) .build(monitor_tag, &allocator) } @@ -241,7 +203,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_secs(1), + Duration::from_secs(1), alive_mock.clone(), ); @@ -267,7 +229,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); @@ -296,7 +258,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_millis(30), + Duration::from_millis(30), alive_mock.clone(), ); @@ -305,19 +267,19 @@ mod tests { .unwrap(); let _handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); assert_eq!(alive_mock.get_notify_count(), 5); @@ -335,11 +297,11 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); - let mut worker = UniqueThreadRunner::new(core::time::Duration::from_millis(10)); + let mut worker = UniqueThreadRunner::new(Duration::from_millis(10)); worker.start(logic); let mut deadline = deadline_monitor @@ -348,7 +310,7 @@ mod tests { let handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(70)); + std::thread::sleep(Duration::from_millis(70)); let current_count = alive_mock.get_notify_count(); assert!( @@ -358,7 +320,7 @@ mod tests { ); // We shall not get any new alive calls. - std::thread::sleep(core::time::Duration::from_millis(50)); + std::thread::sleep(Duration::from_millis(50)); assert_eq!(alive_mock.get_notify_count(), current_count); handle.stop(); } From 35c7c0dcfb3049ec2d1fa6c708e4a7be6d11da22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Wed, 25 Feb 2026 12:35:16 +0100 Subject: [PATCH 2/5] hmon: Cargo fix post-review fixes - Less restrictive feature flags. - Additional small code changes. --- .github/workflows/lint_clippy.yml | 10 ++----- .vscode/settings.json | 3 -- src/health_monitoring_lib/rust/common.rs | 3 ++ src/health_monitoring_lib/rust/lib.rs | 14 +++++++-- .../rust/supervisor_api_client/mod.rs | 18 +++--------- .../score_supervisor_api_client.rs | 4 +-- .../stub_supervisor_api_client.rs | 4 +-- src/health_monitoring_lib/rust/worker.rs | 29 +++++++++++-------- 8 files changed, 41 insertions(+), 44 deletions(-) diff --git a/.github/workflows/lint_clippy.yml b/.github/workflows/lint_clippy.yml index 44a66910..6070de95 100644 --- a/.github/workflows/lint_clippy.yml +++ b/.github/workflows/lint_clippy.yml @@ -48,14 +48,8 @@ jobs: override: true components: clippy - - name: check clippy errors (with "--features stub_supervisor_api_client") + - name: check clippy errors uses: actions-rs/cargo@v1 with: command: clippy - args: --features stub_supervisor_api_client --all-targets -- -D warnings - - - name: check clippy errors (with "--features score_supervisor_api_client") - uses: actions-rs/cargo@v1 - with: - command: clippy - args: --features score_supervisor_api_client --no-default-features --all-targets -- -D warnings + args: --all-features --all-targets --workspace -- -D warnings diff --git a/.vscode/settings.json b/.vscode/settings.json index 4761cb3f..30718b6f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,9 +9,6 @@ "rust-analyzer.cargo.cfgs": [ "!miri" ], - "rust-analyzer.cargo.features": [ - "stub_supervisor_api_client" - ], "rust-analyzer.check.command": "clippy", "rust-analyzer.rustfmt.overrideCommand": [ "${workspaceFolder}/.vscode/rustfmt.sh" diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 35a8ffa5..8042a391 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -39,6 +39,9 @@ pub(crate) enum MonitorEvaluationError { /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. pub(crate) trait MonitorEvaluator { + /// Run monitor evaluation. + /// + /// - `on_error` - error handling, containing tag of failing object and error code. fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)); } diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 844183d2..05db89a5 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -22,7 +22,6 @@ mod worker; pub mod deadline; use crate::common::MonitorEvalHandle; -use crate::supervisor_api_client::SupervisorAPIClientImpl; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; @@ -216,8 +215,17 @@ impl HealthMonitor { } pub(crate) fn start_internal(&mut self, monitors: FixedCapacityVec) { - let monitoring_logic = - worker::MonitoringLogic::new(monitors, self.supervisor_api_cycle, SupervisorAPIClientImpl::new()); + let monitoring_logic = worker::MonitoringLogic::new( + monitors, + self.supervisor_api_cycle, + #[cfg(all(not(test), feature = "score_supervisor_api_client"))] + supervisor_api_client::score_supervisor_api_client::ScoreSupervisorAPIClient::new(), + #[cfg(any( + test, + all(feature = "stub_supervisor_api_client", not(feature = "score_supervisor_api_client")) + ))] + supervisor_api_client::stub_supervisor_api_client::StubSupervisorAPIClient::new(), + ); self.worker.start(monitoring_logic) } diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs index 095e3d01..195d167a 100644 --- a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs +++ b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs @@ -11,7 +11,7 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -//! Module for selecting [`SupervisorAPIClient`] implementation. +//! Module providing [`SupervisorAPIClient`] implementations. //! Currently `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. //! The latter is meant for testing purposes. @@ -20,19 +20,9 @@ pub trait SupervisorAPIClient { fn notify_alive(&self); } -// Disallow both and none features. -#[cfg(any( - all(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client"), - not(any(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client")) -))] -compile_error!("Either 'score_supervisor_api_client' or 'stub_supervisor_api_client' must be enabled!"); +// NOTE: various implementations are not mutually exclusive. #[cfg(feature = "score_supervisor_api_client")] -mod score_supervisor_api_client; +pub mod score_supervisor_api_client; #[cfg(feature = "stub_supervisor_api_client")] -mod stub_supervisor_api_client; - -#[cfg(feature = "score_supervisor_api_client")] -pub use score_supervisor_api_client::ScoreSupervisorAPIClient as SupervisorAPIClientImpl; -#[cfg(feature = "stub_supervisor_api_client")] -pub use stub_supervisor_api_client::StubSupervisorAPIClient as SupervisorAPIClientImpl; +pub mod stub_supervisor_api_client; diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs index 73872372..a198f9ad 100644 --- a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs +++ b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs @@ -11,18 +11,18 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +#![allow(dead_code)] + use crate::log::debug; use crate::supervisor_api_client::SupervisorAPIClient; use crate::worker::Checks; -#[allow(dead_code)] pub struct ScoreSupervisorAPIClient { supervisor_link: monitor_rs::Monitor, } unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution -#[allow(dead_code)] impl ScoreSupervisorAPIClient { pub fn new() -> Self { let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs index a948c819..e98f4909 100644 --- a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs +++ b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs @@ -11,11 +11,12 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +#![allow(dead_code)] + use crate::log::warn; use crate::supervisor_api_client::SupervisorAPIClient; /// A stub implementation of the SupervisorAPIClient that logs alive notifications. -#[allow(dead_code)] pub struct StubSupervisorAPIClient; impl StubSupervisorAPIClient { @@ -24,7 +25,6 @@ impl StubSupervisorAPIClient { } } -#[allow(dead_code)] impl SupervisorAPIClient for StubSupervisorAPIClient { fn notify_alive(&self) { warn!("StubSupervisorAPIClient: notify_alive called"); diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index e5b4c0ac..7be07321 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -14,12 +14,15 @@ use crate::common::{MonitorEvalHandle, MonitorEvaluator}; use crate::log::{info, warn}; use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; +use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; +use std::sync::Arc; +use std::time::Instant; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, client: T, - last_notification: std::time::Instant, + last_notification: Instant, supervisor_api_cycle: Duration, } @@ -38,7 +41,7 @@ impl MonitoringLogic { monitors, client, supervisor_api_cycle, - last_notification: std::time::Instant::now(), + last_notification: Instant::now(), } } @@ -55,7 +58,7 @@ impl MonitoringLogic { if !has_any_error { if self.last_notification.elapsed() > self.supervisor_api_cycle { - self.last_notification = std::time::Instant::now(); + self.last_notification = Instant::now(); self.client.notify_alive(); } } else { @@ -70,7 +73,7 @@ impl MonitoringLogic { /// A struct that manages a unique thread for running monitoring logic periodically. pub struct UniqueThreadRunner { handle: Option>, - should_stop: std::sync::Arc, + should_stop: Arc, internal_duration_cycle: Duration, } @@ -78,7 +81,7 @@ impl UniqueThreadRunner { pub(super) fn new(internal_duration_cycle: Duration) -> Self { Self { handle: None, - should_stop: std::sync::Arc::new(core::sync::atomic::AtomicBool::new(false)), + should_stop: Arc::new(AtomicBool::new(false)), internal_duration_cycle, } } @@ -96,10 +99,10 @@ impl UniqueThreadRunner { let mut next_sleep_time = interval; // TODO Add some checks and log if cyclicly here is not met. - while !should_stop.load(core::sync::atomic::Ordering::Relaxed) { + while !should_stop.load(Ordering::Relaxed) { std::thread::sleep(next_sleep_time); - let now = std::time::Instant::now(); + let now = Instant::now(); if !monitoring_logic.run() { info!("Monitoring logic failed, stopping thread."); @@ -115,7 +118,7 @@ impl UniqueThreadRunner { } pub fn join(&mut self) { - self.should_stop.store(true, core::sync::atomic::Ordering::Relaxed); + self.should_stop.store(true, Ordering::Relaxed); if let Some(handle) = self.handle.take() { let _ = handle.join(); } @@ -152,28 +155,30 @@ mod tests { use crate::worker::{MonitoringLogic, UniqueThreadRunner}; use crate::TimeRange; use containers::fixed_capacity::FixedCapacityVec; + use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; + use std::sync::Arc; #[derive(Clone)] struct MockSupervisorAPIClient { - pub notify_called: std::sync::Arc, + pub notify_called: Arc, } impl MockSupervisorAPIClient { pub fn new() -> Self { Self { - notify_called: std::sync::Arc::new(core::sync::atomic::AtomicUsize::new(0)), + notify_called: Arc::new(AtomicUsize::new(0)), } } fn get_notify_count(&self) -> usize { - self.notify_called.load(core::sync::atomic::Ordering::Acquire) + self.notify_called.load(Ordering::Acquire) } } impl SupervisorAPIClient for MockSupervisorAPIClient { fn notify_alive(&self) { - self.notify_called.fetch_add(1, core::sync::atomic::Ordering::AcqRel); + self.notify_called.fetch_add(1, Ordering::AcqRel); } } From 168d203e2cb343030d9dcfac46c588943a73819b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Mon, 23 Feb 2026 14:09:26 +0100 Subject: [PATCH 3/5] hmon: approach to monitors rework - Remove unnecessary abstraction from monitors. - Improve docs. - Move `FFIBorrowed` and `FFIHandle` to main `ffi` module. - Small esthetics fixes. --- src/health_monitoring_lib/rust/common.rs | 2 - .../rust/deadline/deadline_monitor.rs | 172 +++++++++--------- .../rust/deadline/mod.rs | 1 + src/health_monitoring_lib/rust/ffi.rs | 4 +- src/health_monitoring_lib/rust/lib.rs | 150 ++++++++------- src/health_monitoring_lib/rust/worker.rs | 31 ++-- 6 files changed, 180 insertions(+), 180 deletions(-) diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 8042a391..51381114 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -11,10 +11,8 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::tag::MonitorTag; use core::hash::Hash; use core::time::Duration; -use std::sync::Arc; /// Range of accepted time. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] diff --git a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs index 995b851d..e7eba800 100644 --- a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs +++ b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs @@ -10,20 +10,25 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use super::common::DeadlineTemplate; -use crate::common::{MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, TimeRange}; +use crate::deadline::common::{DeadlineTemplate, StateIndex}; +use crate::deadline::deadline_state::{DeadlineState, DeadlineStateSnapshot}; +use crate::log::{error, warn, ScoreDebug}; +use crate::protected_memory::ProtectedMemoryAllocator; use crate::tag::{DeadlineTag, MonitorTag}; -use crate::{ - deadline::{ - common::StateIndex, - deadline_state::{DeadlineState, DeadlineStateSnapshot}, - }, - protected_memory::ProtectedMemoryAllocator, -}; +use crate::TimeRange; use core::hash::Hash; -use std::{collections::HashMap, sync::Arc, time::Instant}; - -use crate::log::*; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +/// Deadline evaluation errors. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, ScoreDebug)] +pub(crate) enum DeadlineEvalError { + /// Finished too early. + TooEarly, + /// Finished too late. + TooLate, +} /// /// Errors that can occur when working with DeadlineMonitor @@ -64,8 +69,12 @@ impl DeadlineMonitorBuilder { } /// Builds the DeadlineMonitor with the configured deadlines. - pub(crate) fn build(self, monitor_tag: MonitorTag, _allocator: &ProtectedMemoryAllocator) -> DeadlineMonitor { - DeadlineMonitor::new(monitor_tag, self.deadlines) + pub(crate) fn build( + self, + monitor_tag: MonitorTag, + _allocator: &ProtectedMemoryAllocator, + ) -> Arc { + Arc::new(DeadlineMonitorInner::new(monitor_tag, self.deadlines)) } // Used by FFI and config parsing code which prefer not to move builder instance @@ -80,27 +89,9 @@ pub struct DeadlineMonitor { } impl DeadlineMonitor { - fn new(monitor_tag: MonitorTag, deadlines: HashMap) -> Self { - let mut active_deadlines = vec![]; - - let deadlines = deadlines - .into_iter() - .enumerate() - .map(|(index, (deadline_tag, range))| { - active_deadlines.push((deadline_tag, DeadlineState::new())); - (deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index))) - }) - .collect(); - - Self { - #[allow(clippy::arc_with_non_send_sync)] // This will be fixed once we add background thread - inner: Arc::new(DeadlineMonitorInner { - monitor_tag, - deadlines, - active_deadlines: active_deadlines.into(), - monitor_starting_point: Instant::now(), - }), - } + /// Create a new [`DeadlineMonitor`] instance. + pub(crate) fn new(inner: Arc) -> Self { + Self { inner } } /// Acquires a deadline instance for the given tag. @@ -109,27 +100,7 @@ impl DeadlineMonitor { /// - Err(DeadlineMonitorError::DeadlineInUse) - if the deadline is already in use /// - Err(DeadlineMonitorError::DeadlineNotFound) - if the deadline tag is not registered pub fn get_deadline(&self, deadline_tag: DeadlineTag) -> Result { - if let Some(template) = self.inner.deadlines.get(&deadline_tag) { - match template.acquire_deadline() { - Some(range) => Ok(Deadline { - range, - deadline_tag, - monitor: Arc::clone(&self.inner), - state_index: template.assigned_state_index, - }), - None => Err(DeadlineMonitorError::DeadlineInUse), - } - } else { - Err(DeadlineMonitorError::DeadlineNotFound) - } - } - - /// Handle for evaluation of all active deadlines and reporting any missed deadlines or underruns. - /// - /// # NOTE - /// This function is intended to be called from a background thread periodically. - pub(crate) fn get_eval_handle(&self) -> MonitorEvalHandle { - MonitorEvalHandle::new(Arc::clone(&self.inner)) + self.inner.get_deadline(deadline_tag) } } @@ -220,7 +191,7 @@ impl Deadline { let expected = current.timestamp_ms(); if expected < now { - possible_err = (Some(MonitorEvaluationError::TooLate), now - expected); + possible_err = (Some(DeadlineEvalError::TooLate), now - expected); return None; // Deadline missed, let state as is for BG thread to report } @@ -231,7 +202,7 @@ impl Deadline { // Finished too early, leave it for reporting by BG thread current.set_underrun(); - possible_err = (Some(MonitorEvaluationError::TooEarly), earliest_time - now); + possible_err = (Some(DeadlineEvalError::TooEarly), earliest_time - now); return Some(current); } @@ -239,10 +210,10 @@ impl Deadline { }); match possible_err { - (Some(MonitorEvaluationError::TooEarly), val) => { + (Some(DeadlineEvalError::TooEarly), val) => { error!("Deadline {:?} stopped too early by {} ms", self.deadline_tag, val); }, - (Some(MonitorEvaluationError::TooLate), val) => { + (Some(DeadlineEvalError::TooLate), val) => { error!("Deadline {:?} stopped too late by {} ms", self.deadline_tag, val); }, (None, _) => {}, @@ -268,7 +239,7 @@ impl Drop for Deadline { } } -struct DeadlineMonitorInner { +pub(crate) struct DeadlineMonitorInner { /// Tag of this monitor. monitor_tag: MonitorTag, @@ -284,13 +255,28 @@ struct DeadlineMonitorInner { active_deadlines: Arc<[(DeadlineTag, DeadlineState)]>, } -impl MonitorEvaluator for DeadlineMonitorInner { - fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { - self.evaluate(on_error); +impl DeadlineMonitorInner { + fn new(monitor_tag: MonitorTag, deadlines: HashMap) -> Self { + let mut active_deadlines = vec![]; + + let deadlines = deadlines + .into_iter() + .enumerate() + .map(|(index, (deadline_tag, range))| { + active_deadlines.push((deadline_tag, DeadlineState::new())); + (deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index))) + }) + .collect(); + + #[allow(clippy::arc_with_non_send_sync)] // This will be fixed once we add background thread + Self { + monitor_tag, + deadlines, + active_deadlines: active_deadlines.into(), + monitor_starting_point: Instant::now(), + } } -} -impl DeadlineMonitorInner { fn release_deadline(&self, deadline_tag: DeadlineTag) { if let Some(template) = self.deadlines.get(&deadline_tag) { template.release_deadline(); @@ -299,6 +285,22 @@ impl DeadlineMonitorInner { } } + pub(crate) fn get_deadline(self: &Arc, deadline_tag: DeadlineTag) -> Result { + if let Some(template) = self.deadlines.get(&deadline_tag) { + match template.acquire_deadline() { + Some(range) => Ok(Deadline { + range, + deadline_tag, + monitor: self.clone(), + state_index: template.assigned_state_index, + }), + None => Err(DeadlineMonitorError::DeadlineInUse), + } + } else { + Err(DeadlineMonitorError::DeadlineNotFound) + } + } + fn now(&self) -> u32 { let duration = self.monitor_starting_point.elapsed(); // As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case @@ -306,7 +308,7 @@ impl DeadlineMonitorInner { u32::try_from(duration.as_millis()).expect("Monitor running for too long") } - fn evaluate(&self, mut on_failed: impl FnMut(&MonitorTag, MonitorEvaluationError)) { + pub(crate) fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, DeadlineEvalError)) { for (deadline_tag, deadline) in self.active_deadlines.iter() { let snapshot = deadline.snapshot(); if snapshot.is_underrun() { @@ -314,7 +316,7 @@ impl DeadlineMonitorInner { warn!("Deadline ({:?}) finished too early!", deadline_tag); // Here we would normally report the underrun to the monitoring system - on_failed(&self.monitor_tag, MonitorEvaluationError::TooEarly); + on_error(&self.monitor_tag, DeadlineEvalError::TooEarly); } else if snapshot.is_running() { debug_assert!( snapshot.is_stopped(), @@ -331,7 +333,7 @@ impl DeadlineMonitorInner { ); // Here we would normally report the missed deadline to the monitoring system - on_failed(&self.monitor_tag, MonitorEvaluationError::TooLate); + on_error(&self.monitor_tag, DeadlineEvalError::TooLate); } } } @@ -346,7 +348,7 @@ mod tests { fn create_monitor_with_deadlines() -> DeadlineMonitor { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); - DeadlineMonitorBuilder::new() + let inner = DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("deadline_long"), TimeRange::new(core::time::Duration::from_secs(1), core::time::Duration::from_secs(50)), @@ -358,13 +360,14 @@ mod tests { core::time::Duration::from_millis(50), ), ) - .build(monitor_tag, &allocator) + .build(monitor_tag, &allocator); + DeadlineMonitor::new(inner) } fn create_monitor_with_multiple_running_deadlines() -> DeadlineMonitor { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); - DeadlineMonitorBuilder::new() + let inner = DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("slow"), TimeRange::new(core::time::Duration::from_secs(0), core::time::Duration::from_secs(50)), @@ -390,7 +393,8 @@ mod tests { core::time::Duration::from_millis(10), ), ) - .build(monitor_tag, &allocator) + .build(monitor_tag, &allocator); + DeadlineMonitor::new(inner) } #[test] @@ -410,7 +414,7 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { panic!( "Deadline {:?} should not have failed or underrun({:?})", monitor_tag, deadline_failure @@ -426,10 +430,10 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvalError::TooEarly, "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -444,10 +448,10 @@ mod tests { // So deadline stop happens after evaluate, still it should be reported as failed - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvalError::TooEarly, "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -470,10 +474,10 @@ mod tests { let handle = deadline.start(); assert_eq!(handle.err(), Some(DeadlineError::DeadlineAlreadyFailed)); - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvalError::TooEarly, "Deadline {:?} should not have failed ({:?})", monitor_tag, deadline_failure @@ -489,10 +493,10 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooLate, + DeadlineEvalError::TooLate, "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -517,11 +521,11 @@ mod tests { let mut cnt = 0; - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { cnt += 1; assert_eq!( deadline_failure, - MonitorEvaluationError::TooLate, + DeadlineEvalError::TooLate, "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure diff --git a/src/health_monitoring_lib/rust/deadline/mod.rs b/src/health_monitoring_lib/rust/deadline/mod.rs index d903c412..922b5c9f 100644 --- a/src/health_monitoring_lib/rust/deadline/mod.rs +++ b/src/health_monitoring_lib/rust/deadline/mod.rs @@ -15,6 +15,7 @@ mod common; mod deadline_monitor; mod deadline_state; +pub(crate) use deadline_monitor::DeadlineMonitorInner; pub use deadline_monitor::{ DeadlineError, DeadlineHandle, DeadlineMonitor, DeadlineMonitorBuilder, DeadlineMonitorError, }; diff --git a/src/health_monitoring_lib/rust/ffi.rs b/src/health_monitoring_lib/rust/ffi.rs index f08b4c08..0bdf650c 100644 --- a/src/health_monitoring_lib/rust/ffi.rs +++ b/src/health_monitoring_lib/rust/ffi.rs @@ -211,12 +211,12 @@ pub extern "C" fn health_monitor_start(health_monitor_handle: FFIHandle) -> FFIC return FFICode::WrongState; } - let monitors = match health_monitor.collect_monitors_internal() { + let deadline_monitors = match health_monitor.collect_deadline_monitors_internal() { Ok(m) => m, Err(_) => return FFICode::WrongState, }; - health_monitor.start_internal(monitors); + health_monitor.start_internal(deadline_monitors); FFICode::Success } diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 05db89a5..0b1f0fab 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -26,11 +26,13 @@ pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; use std::collections::HashMap; +use std::sync::Arc; pub use tag::{DeadlineTag, MonitorTag}; +/// Builder for the [`HealthMonitor`]. #[derive(Default)] pub struct HealthMonitorBuilder { - deadline_monitor_builders: HashMap, + deadline_monitor_builders: HashMap, supervisor_api_cycle: Duration, internal_processing_cycle: Duration, } @@ -45,26 +47,31 @@ impl HealthMonitorBuilder { } } - /// Adds a deadline monitor for a specific identifier tag. - /// # Arguments - /// * `monitor_tag` - The unique identifier for the deadline monitor. - /// * `monitor` - The builder for the deadline monitor. + /// Add a [`DeadlineMonitor`] for the given [`MonitorTag`]. + /// + /// - `monitor_tag` - unique tag for the [`DeadlineMonitor`]. + /// - `monitor_builder` - monitor builder to finalize. + /// /// # Note - /// If a monitor with the same tag already exists, it will be overwritten. - pub fn add_deadline_monitor(mut self, monitor_tag: MonitorTag, monitor: deadline::DeadlineMonitorBuilder) -> Self { - self.add_deadline_monitor_internal(monitor_tag, monitor); + /// + /// If a deadline monitor with the same tag already exists, it will be overwritten. + pub fn add_deadline_monitor(mut self, monitor_tag: MonitorTag, monitor_builder: DeadlineMonitorBuilder) -> Self { + self.add_deadline_monitor_internal(monitor_tag, monitor_builder); self } - /// Sets the cycle duration for supervisor API notifications. - /// This duration determines how often the health monitor notifies the supervisor that the system is alive. + /// Set the interval between supervisor API notifications. + /// This duration determines how often the health monitor notifies the supervisor about system liveness. + /// + /// - `cycle_duration` - interval between notifications. pub fn with_supervisor_api_cycle(mut self, cycle_duration: Duration) -> Self { self.with_supervisor_api_cycle_internal(cycle_duration); self } - /// Sets the internal processing cycle duration. - /// This duration determines how often the health monitor checks deadlines. + /// Set the internal interval between health monitor evaluations. + /// + /// - `cycle_duration` - interval between evaluations. pub fn with_internal_processing_cycle(mut self, cycle_duration: Duration) -> Self { self.with_internal_processing_cycle_internal(cycle_duration); self @@ -85,9 +92,9 @@ impl HealthMonitorBuilder { pub(crate) fn add_deadline_monitor_internal( &mut self, monitor_tag: MonitorTag, - monitor: deadline::DeadlineMonitorBuilder, + monitor_builder: DeadlineMonitorBuilder, ) { - self.deadline_monitor_builders.insert(monitor_tag, monitor); + self.deadline_monitor_builders.insert(monitor_tag, monitor_builder); } pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: Duration) { @@ -110,10 +117,7 @@ impl HealthMonitorBuilder { // Create deadline monitors. let mut deadline_monitors = HashMap::new(); for (tag, builder) in self.deadline_monitor_builders { - deadline_monitors.insert( - tag, - Some(DeadlineMonitorState::Available(builder.build(tag, &allocator))), - ); + deadline_monitors.insert(tag, (MonitorState::Available, builder.build(tag, &allocator))); } HealthMonitor { @@ -124,94 +128,93 @@ impl HealthMonitorBuilder { } } -enum DeadlineMonitorState { - Available(deadline::DeadlineMonitor), - Taken(common::MonitorEvalHandle), +/// Monitor ownership state in the [`HealthMonitor`]. +enum MonitorState { + /// Monitor is available. + Available, + /// Monitor is already taken. + Taken, } +/// Health monitor. pub struct HealthMonitor { - deadline_monitors: HashMap>, + deadline_monitors: HashMap)>, worker: worker::UniqueThreadRunner, supervisor_api_cycle: Duration, } impl HealthMonitor { - /// Retrieves and removes (hand over to user) a deadline monitor associated with the given identifier tag. - /// # Arguments - /// * `monitor_tag` - The unique identifier for the deadline monitor. - /// # Returns - /// An Option containing the DeadlineMonitor if found, or None if - /// - no monitor exists for the given tag or was already obtained + /// Get and pass ownership of a [`DeadlineMonitor`] for the given [`MonitorTag`]. /// - pub fn get_deadline_monitor(&mut self, monitor_tag: MonitorTag) -> Option { - let monitor = self.deadline_monitors.get_mut(&monitor_tag)?; - - match monitor.take() { - Some(DeadlineMonitorState::Available(deadline_monitor)) => { - monitor.replace(DeadlineMonitorState::Taken(deadline_monitor.get_eval_handle())); - - Some(deadline_monitor) - }, - Some(DeadlineMonitorState::Taken(v)) => { - monitor.replace(DeadlineMonitorState::Taken(v)); // Insert back - None + /// - `monitor_tag` - unique tag for the [`DeadlineMonitor`]. + /// + /// Returns [`Some`] containing [`DeadlineMonitor`] if found and not taken. + /// Otherwise returns [`None`]. + pub fn get_deadline_monitor(&mut self, monitor_tag: MonitorTag) -> Option { + let (state, inner) = self.deadline_monitors.get_mut(&monitor_tag)?; + match state { + MonitorState::Available => { + *state = MonitorState::Taken; + Some(DeadlineMonitor::new(inner.clone())) }, - None => None, + MonitorState::Taken => None, } } - /// Starts the health monitoring logic in a separate thread. + /// Start the health monitoring logic in a separate thread. /// /// From this point, the health monitor will periodically check monitors and notify the supervisor about system liveness. /// - /// # Note - /// - This function shall be called before Lifecycle.running() otherwise the supervisor might consider the process not alive. - /// - Stops when the HealthMonitor instance is dropped. + /// # Notes + /// + /// This method shall be called before `Lifecycle.running()`. + /// Otherwise the supervisor might consider the process not alive. + /// + /// Health monitoring logic stop when the [`HealthMonitor`] is dropped. + /// + /// # Panics /// - /// Panics if no monitors have been added. + /// Method panics if no monitors have been added. pub fn start(&mut self) { + // Check number of monitors. assert!( self.check_monitors_exist_internal(), - "No deadline monitors have been added. HealthMonitor cannot start without any monitors." + "No monitors have been added. HealthMonitor cannot start without any monitors." ); - let monitors = match self.collect_monitors_internal() { + let deadline_monitors = match self.collect_deadline_monitors_internal() { Ok(m) => m, Err(e) => panic!("{}", e), }; - self.start_internal(monitors); + self.start_internal(deadline_monitors); } pub(crate) fn check_monitors_exist_internal(&self) -> bool { !self.deadline_monitors.is_empty() } - pub(crate) fn collect_monitors_internal(&mut self) -> Result, String> { - let mut monitors = FixedCapacityVec::new(self.deadline_monitors.len()); - for (tag, monitor) in self.deadline_monitors.iter_mut() { - match monitor.take() { - Some(DeadlineMonitorState::Taken(handle)) => { - if monitors.push(handle).is_err() { - // Should not fail since we preallocated enough capacity + pub(crate) fn collect_deadline_monitors_internal( + &self, + ) -> Result>, String> { + let mut collected_monitors = FixedCapacityVec::new(self.deadline_monitors.len()); + for (tag, (state, inner)) in self.deadline_monitors.iter() { + match state { + MonitorState::Taken => { + if collected_monitors.push(inner.clone()).is_err() { + // Should not fail - capacity was preallocated. return Err("Failed to push monitor handle".to_string()); } }, - Some(DeadlineMonitorState::Available(_)) => { + MonitorState::Available => { return Err(format!( "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", tag )); }, - None => { - return Err(format!( - "Invalid monitor ({:?}) state encountered while starting HealthMonitor.", - tag - )); - }, - } + }; } - Ok(monitors) + Ok(collected_monitors) } pub(crate) fn start_internal(&mut self, monitors: FixedCapacityVec) { @@ -239,7 +242,7 @@ mod tests { use super::*; #[test] - #[should_panic(expected = "No deadline monitors have been added. HealthMonitor cannot start without any monitors.")] + #[should_panic(expected = "No monitors have been added. HealthMonitor cannot start without any monitors.")] fn hm_with_no_monitors_shall_panic_on_start() { let health_monitor_builder = super::HealthMonitorBuilder::new(); health_monitor_builder.build().start(); @@ -256,10 +259,7 @@ mod tests { #[test] fn hm_with_taken_monitors_starts() { let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) + .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) .build(); let _monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); @@ -272,10 +272,7 @@ mod tests { )] fn hm_with_monitors_shall_not_start_with_not_taken_monitors() { let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) + .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) .build(); health_monitor.start(); @@ -284,10 +281,7 @@ mod tests { #[test] fn hm_get_deadline_monitor_works() { let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) + .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) .build(); { diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index 7be07321..03318080 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -10,7 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::common::{MonitorEvalHandle, MonitorEvaluator}; +use crate::deadline::DeadlineMonitorInner; use crate::log::{info, warn}; use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::Instant; pub(super) struct MonitoringLogic { - monitors: FixedCapacityVec, + deadline_monitors: FixedCapacityVec>, client: T, last_notification: Instant, supervisor_api_cycle: Duration, @@ -29,16 +29,16 @@ pub(super) struct MonitoringLogic { impl MonitoringLogic { /// Creates a new MonitoringLogic instance. /// # Arguments - /// * `monitors` - A vector of monitor evaluation handles. + /// * `deadline_monitors` - A vector of monitor evaluation handles. /// * `supervisor_api_cycle` - Duration between alive notifications to the supervisor. /// * `client` - An implementation of the SupervisorAPIClient trait. pub(super) fn new( - monitors: FixedCapacityVec, + deadline_monitors: FixedCapacityVec>, supervisor_api_cycle: Duration, client: T, ) -> Self { Self { - monitors, + deadline_monitors, client, supervisor_api_cycle, last_notification: Instant::now(), @@ -48,11 +48,14 @@ impl MonitoringLogic { fn run(&mut self) -> bool { let mut has_any_error = false; - for monitor in self.monitors.iter() { + // Evaluate deadline monitors. + for monitor in self.deadline_monitors.iter() { monitor.evaluate(&mut |monitor_tag, error| { has_any_error = true; - // TODO: monitor type should be mentioned. - warn!("Monitor with tag {:?} reported error: {:?}.", monitor_tag, error); + warn!( + "Deadline monitor with tag {:?} reported error: {:?}.", + monitor_tag, error + ); }); } @@ -148,7 +151,7 @@ impl From for u32 { #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { - use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; + use crate::deadline::{DeadlineMonitorBuilder, DeadlineMonitorInner}; use crate::protected_memory::ProtectedMemoryAllocator; use crate::supervisor_api_client::SupervisorAPIClient; use crate::tag::{DeadlineTag, MonitorTag}; @@ -182,7 +185,7 @@ mod tests { } } - fn create_monitor_with_deadlines() -> DeadlineMonitor { + fn create_monitor_with_deadlines() -> Arc { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); DeadlineMonitorBuilder::new() @@ -205,7 +208,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.get_eval_handle()).unwrap(); + vec.push(deadline_monitor.clone()).unwrap(); vec }, Duration::from_secs(1), @@ -231,7 +234,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.get_eval_handle()).unwrap(); + vec.push(deadline_monitor.clone()).unwrap(); vec }, Duration::from_nanos(0), // Make sure each call notifies alive @@ -260,7 +263,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.get_eval_handle()).unwrap(); + vec.push(deadline_monitor.clone()).unwrap(); vec }, Duration::from_millis(30), @@ -299,7 +302,7 @@ mod tests { let logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.get_eval_handle()).unwrap(); + vec.push(deadline_monitor.clone()).unwrap(); vec }, Duration::from_nanos(0), // Make sure each call notifies alive From c85301cc0f42ce78c0f49f719304df4bba4cf766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Tue, 24 Feb 2026 12:37:48 +0100 Subject: [PATCH 4/5] hmon: approach to monitors rework - Restore some parts. - Separate errors into groups. --- src/health_monitoring_lib/rust/common.rs | 29 ++++- .../rust/deadline/deadline_monitor.rs | 114 +++++++++--------- .../rust/deadline/mod.rs | 2 +- src/health_monitoring_lib/rust/ffi.rs | 4 +- src/health_monitoring_lib/rust/lib.rs | 85 ++++++++----- src/health_monitoring_lib/rust/worker.rs | 41 ++++--- 6 files changed, 169 insertions(+), 106 deletions(-) diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 51381114..9cbb9ccc 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -11,8 +11,12 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +use crate::deadline::DeadlineEvaluationError; +use crate::log::ScoreDebug; +use crate::tag::MonitorTag; use core::hash::Hash; use core::time::Duration; +use std::sync::Arc; /// Range of accepted time. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -28,11 +32,30 @@ impl TimeRange { } } +/// The monitor has an evaluation handle available. +pub(crate) trait HasEvalHandle { + /// Get an evaluation handle for this monitor. + /// + /// # NOTE + /// + /// This method is intended to be called from a background thread periodically. + fn get_eval_handle(&self) -> MonitorEvalHandle; +} + /// Errors that can occur during monitor evaluation. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, crate::log::ScoreDebug)] +/// Contains failing monitor type. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, ScoreDebug)] +#[allow(dead_code)] pub(crate) enum MonitorEvaluationError { - TooEarly, - TooLate, + Deadline(DeadlineEvaluationError), + Heartbeat, + Logic, +} + +impl From for MonitorEvaluationError { + fn from(value: DeadlineEvaluationError) -> Self { + MonitorEvaluationError::Deadline(value) + } } /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. diff --git a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs index e7eba800..24413079 100644 --- a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs +++ b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs @@ -10,6 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +use crate::common::{HasEvalHandle, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; use crate::deadline::common::{DeadlineTemplate, StateIndex}; use crate::deadline::deadline_state::{DeadlineState, DeadlineStateSnapshot}; use crate::log::{error, warn, ScoreDebug}; @@ -23,7 +24,7 @@ use std::time::Instant; /// Deadline evaluation errors. #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, ScoreDebug)] -pub(crate) enum DeadlineEvalError { +pub(crate) enum DeadlineEvaluationError { /// Finished too early. TooEarly, /// Finished too late. @@ -69,12 +70,9 @@ impl DeadlineMonitorBuilder { } /// Builds the DeadlineMonitor with the configured deadlines. - pub(crate) fn build( - self, - monitor_tag: MonitorTag, - _allocator: &ProtectedMemoryAllocator, - ) -> Arc { - Arc::new(DeadlineMonitorInner::new(monitor_tag, self.deadlines)) + pub(crate) fn build(self, monitor_tag: MonitorTag, _allocator: &ProtectedMemoryAllocator) -> DeadlineMonitor { + let inner = Arc::new(DeadlineMonitorInner::new(monitor_tag, self.deadlines)); + DeadlineMonitor::new(inner) } // Used by FFI and config parsing code which prefer not to move builder instance @@ -90,7 +88,7 @@ pub struct DeadlineMonitor { impl DeadlineMonitor { /// Create a new [`DeadlineMonitor`] instance. - pub(crate) fn new(inner: Arc) -> Self { + fn new(inner: Arc) -> Self { Self { inner } } @@ -104,6 +102,12 @@ impl DeadlineMonitor { } } +impl HasEvalHandle for DeadlineMonitor { + fn get_eval_handle(&self) -> MonitorEvalHandle { + MonitorEvalHandle::new(Arc::clone(&self.inner)) + } +} + /// Represents a deadline that can be started and stopped. pub struct Deadline { range: TimeRange, @@ -191,7 +195,7 @@ impl Deadline { let expected = current.timestamp_ms(); if expected < now { - possible_err = (Some(DeadlineEvalError::TooLate), now - expected); + possible_err = (Some(DeadlineEvaluationError::TooLate), now - expected); return None; // Deadline missed, let state as is for BG thread to report } @@ -202,7 +206,7 @@ impl Deadline { // Finished too early, leave it for reporting by BG thread current.set_underrun(); - possible_err = (Some(DeadlineEvalError::TooEarly), earliest_time - now); + possible_err = (Some(DeadlineEvaluationError::TooEarly), earliest_time - now); return Some(current); } @@ -210,10 +214,10 @@ impl Deadline { }); match possible_err { - (Some(DeadlineEvalError::TooEarly), val) => { + (Some(DeadlineEvaluationError::TooEarly), val) => { error!("Deadline {:?} stopped too early by {} ms", self.deadline_tag, val); }, - (Some(DeadlineEvalError::TooLate), val) => { + (Some(DeadlineEvaluationError::TooLate), val) => { error!("Deadline {:?} stopped too late by {} ms", self.deadline_tag, val); }, (None, _) => {}, @@ -239,7 +243,7 @@ impl Drop for Deadline { } } -pub(crate) struct DeadlineMonitorInner { +struct DeadlineMonitorInner { /// Tag of this monitor. monitor_tag: MonitorTag, @@ -255,6 +259,39 @@ pub(crate) struct DeadlineMonitorInner { active_deadlines: Arc<[(DeadlineTag, DeadlineState)]>, } +impl MonitorEvaluator for DeadlineMonitorInner { + fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { + for (deadline_tag, deadline) in self.active_deadlines.iter() { + let snapshot = deadline.snapshot(); + if snapshot.is_underrun() { + // Deadline finished too early, report + warn!("Deadline ({:?}) finished too early!", deadline_tag); + + // Here we would normally report the underrun to the monitoring system + on_error(&self.monitor_tag, DeadlineEvaluationError::TooEarly.into()); + } else if snapshot.is_running() { + debug_assert!( + snapshot.is_stopped(), + "Deadline snapshot cannot be both running and stopped" + ); + + let now = self.now(); + let expected = snapshot.timestamp_ms(); + if now > expected { + // Deadline missed, report + warn!( + "Deadline ({:?}) missed! Expected: {}, now: {}", + deadline_tag, expected, now + ); + + // Here we would normally report the missed deadline to the monitoring system + on_error(&self.monitor_tag, DeadlineEvaluationError::TooLate.into()); + } + } + } + } +} + impl DeadlineMonitorInner { fn new(monitor_tag: MonitorTag, deadlines: HashMap) -> Self { let mut active_deadlines = vec![]; @@ -307,37 +344,6 @@ impl DeadlineMonitorInner { // We still have a room up to 60bits timestamp if needed in future u32::try_from(duration.as_millis()).expect("Monitor running for too long") } - - pub(crate) fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, DeadlineEvalError)) { - for (deadline_tag, deadline) in self.active_deadlines.iter() { - let snapshot = deadline.snapshot(); - if snapshot.is_underrun() { - // Deadline finished too early, report - warn!("Deadline ({:?}) finished too early!", deadline_tag); - - // Here we would normally report the underrun to the monitoring system - on_error(&self.monitor_tag, DeadlineEvalError::TooEarly); - } else if snapshot.is_running() { - debug_assert!( - snapshot.is_stopped(), - "Deadline snapshot cannot be both running and stopped" - ); - - let now = self.now(); - let expected = snapshot.timestamp_ms(); - if now > expected { - // Deadline missed, report - warn!( - "Deadline ({:?}) missed! Expected: {}, now: {}", - deadline_tag, expected, now - ); - - // Here we would normally report the missed deadline to the monitoring system - on_error(&self.monitor_tag, DeadlineEvalError::TooLate); - } - } - } - } } #[score_testing_macros::test_mod_with_log] @@ -348,7 +354,7 @@ mod tests { fn create_monitor_with_deadlines() -> DeadlineMonitor { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); - let inner = DeadlineMonitorBuilder::new() + DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("deadline_long"), TimeRange::new(core::time::Duration::from_secs(1), core::time::Duration::from_secs(50)), @@ -360,14 +366,13 @@ mod tests { core::time::Duration::from_millis(50), ), ) - .build(monitor_tag, &allocator); - DeadlineMonitor::new(inner) + .build(monitor_tag, &allocator) } fn create_monitor_with_multiple_running_deadlines() -> DeadlineMonitor { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); - let inner = DeadlineMonitorBuilder::new() + DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("slow"), TimeRange::new(core::time::Duration::from_secs(0), core::time::Duration::from_secs(50)), @@ -393,8 +398,7 @@ mod tests { core::time::Duration::from_millis(10), ), ) - .build(monitor_tag, &allocator); - DeadlineMonitor::new(inner) + .build(monitor_tag, &allocator) } #[test] @@ -433,7 +437,7 @@ mod tests { monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - DeadlineEvalError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -451,7 +455,7 @@ mod tests { monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - DeadlineEvalError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -477,7 +481,7 @@ mod tests { monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - DeadlineEvalError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed ({:?})", monitor_tag, deadline_failure @@ -496,7 +500,7 @@ mod tests { monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - DeadlineEvalError::TooLate, + DeadlineEvaluationError::TooLate.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -525,7 +529,7 @@ mod tests { cnt += 1; assert_eq!( deadline_failure, - DeadlineEvalError::TooLate, + DeadlineEvaluationError::TooLate.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure diff --git a/src/health_monitoring_lib/rust/deadline/mod.rs b/src/health_monitoring_lib/rust/deadline/mod.rs index 922b5c9f..7444ce3b 100644 --- a/src/health_monitoring_lib/rust/deadline/mod.rs +++ b/src/health_monitoring_lib/rust/deadline/mod.rs @@ -15,7 +15,7 @@ mod common; mod deadline_monitor; mod deadline_state; -pub(crate) use deadline_monitor::DeadlineMonitorInner; +pub(crate) use deadline_monitor::DeadlineEvaluationError; pub use deadline_monitor::{ DeadlineError, DeadlineHandle, DeadlineMonitor, DeadlineMonitorBuilder, DeadlineMonitorError, }; diff --git a/src/health_monitoring_lib/rust/ffi.rs b/src/health_monitoring_lib/rust/ffi.rs index 0bdf650c..f08b4c08 100644 --- a/src/health_monitoring_lib/rust/ffi.rs +++ b/src/health_monitoring_lib/rust/ffi.rs @@ -211,12 +211,12 @@ pub extern "C" fn health_monitor_start(health_monitor_handle: FFIHandle) -> FFIC return FFICode::WrongState; } - let deadline_monitors = match health_monitor.collect_deadline_monitors_internal() { + let monitors = match health_monitor.collect_monitors_internal() { Ok(m) => m, Err(_) => return FFICode::WrongState, }; - health_monitor.start_internal(deadline_monitors); + health_monitor.start_internal(monitors); FFICode::Success } diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 0b1f0fab..610ae31b 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -21,12 +21,12 @@ mod worker; pub mod deadline; -use crate::common::MonitorEvalHandle; +use crate::common::{HasEvalHandle, MonitorEvalHandle}; +use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; use std::collections::HashMap; -use std::sync::Arc; pub use tag::{DeadlineTag, MonitorTag}; /// Builder for the [`HealthMonitor`]. @@ -117,7 +117,7 @@ impl HealthMonitorBuilder { // Create deadline monitors. let mut deadline_monitors = HashMap::new(); for (tag, builder) in self.deadline_monitor_builders { - deadline_monitors.insert(tag, (MonitorState::Available, builder.build(tag, &allocator))); + deadline_monitors.insert(tag, Some(MonitorState::Available(builder.build(tag, &allocator)))); } HealthMonitor { @@ -129,21 +129,46 @@ impl HealthMonitorBuilder { } /// Monitor ownership state in the [`HealthMonitor`]. -enum MonitorState { +enum MonitorState { /// Monitor is available. - Available, + Available(Monitor), /// Monitor is already taken. - Taken, + Taken(MonitorEvalHandle), } +/// Monitor container. +/// - Must be an option to ensure monitor can be taken out (not referenced). +/// - Must be an enum to ensure evaluation handle is still available for HMON after monitor is taken. +type MonitorContainer = Option>; + /// Health monitor. pub struct HealthMonitor { - deadline_monitors: HashMap)>, + deadline_monitors: HashMap>, worker: worker::UniqueThreadRunner, supervisor_api_cycle: Duration, } impl HealthMonitor { + fn get_monitor( + monitors: &mut HashMap>, + monitor_tag: MonitorTag, + ) -> Option { + let monitor_state = monitors.get_mut(&monitor_tag)?; + + match monitor_state.take() { + Some(MonitorState::Available(monitor)) => { + monitor_state.replace(MonitorState::Taken(monitor.get_eval_handle())); + Some(monitor) + }, + Some(MonitorState::Taken(handle)) => { + // Taken handle is inserted back. + monitor_state.replace(MonitorState::Taken(handle)); + None + }, + None => None, + } + } + /// Get and pass ownership of a [`DeadlineMonitor`] for the given [`MonitorTag`]. /// /// - `monitor_tag` - unique tag for the [`DeadlineMonitor`]. @@ -151,14 +176,7 @@ impl HealthMonitor { /// Returns [`Some`] containing [`DeadlineMonitor`] if found and not taken. /// Otherwise returns [`None`]. pub fn get_deadline_monitor(&mut self, monitor_tag: MonitorTag) -> Option { - let (state, inner) = self.deadline_monitors.get_mut(&monitor_tag)?; - match state { - MonitorState::Available => { - *state = MonitorState::Taken; - Some(DeadlineMonitor::new(inner.clone())) - }, - MonitorState::Taken => None, - } + Self::get_monitor(&mut self.deadline_monitors, monitor_tag) } /// Start the health monitoring logic in a separate thread. @@ -176,44 +194,55 @@ impl HealthMonitor { /// /// Method panics if no monitors have been added. pub fn start(&mut self) { - // Check number of monitors. assert!( self.check_monitors_exist_internal(), "No monitors have been added. HealthMonitor cannot start without any monitors." ); - let deadline_monitors = match self.collect_deadline_monitors_internal() { + let monitors = match self.collect_monitors_internal() { Ok(m) => m, Err(e) => panic!("{}", e), }; - self.start_internal(deadline_monitors); + self.start_internal(monitors); } pub(crate) fn check_monitors_exist_internal(&self) -> bool { !self.deadline_monitors.is_empty() } - pub(crate) fn collect_deadline_monitors_internal( - &self, - ) -> Result>, String> { - let mut collected_monitors = FixedCapacityVec::new(self.deadline_monitors.len()); - for (tag, (state, inner)) in self.deadline_monitors.iter() { - match state { - MonitorState::Taken => { - if collected_monitors.push(inner.clone()).is_err() { + fn collect_given_monitors( + monitors_to_collect: &mut HashMap>, + collected_monitors: &mut FixedCapacityVec, + ) -> Result<(), String> { + for (tag, monitor) in monitors_to_collect.iter_mut() { + match monitor.take() { + Some(MonitorState::Taken(handle)) => { + if collected_monitors.push(handle).is_err() { // Should not fail - capacity was preallocated. return Err("Failed to push monitor handle".to_string()); } }, - MonitorState::Available => { + Some(MonitorState::Available(_)) => { return Err(format!( "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", tag )); }, - }; + None => { + return Err(format!( + "Invalid monitor ({:?}) state encountered while starting HealthMonitor.", + tag + )); + }, + } } + Ok(()) + } + + pub(crate) fn collect_monitors_internal(&mut self) -> Result, String> { + let mut collected_monitors = FixedCapacityVec::new(self.deadline_monitors.len()); + Self::collect_given_monitors(&mut self.deadline_monitors, &mut collected_monitors)?; Ok(collected_monitors) } diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index 03318080..916d589e 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -10,7 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::deadline::DeadlineMonitorInner; +use crate::common::{MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; use crate::log::{info, warn}; use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::Instant; pub(super) struct MonitoringLogic { - deadline_monitors: FixedCapacityVec>, + monitors: FixedCapacityVec, client: T, last_notification: Instant, supervisor_api_cycle: Duration, @@ -29,16 +29,16 @@ pub(super) struct MonitoringLogic { impl MonitoringLogic { /// Creates a new MonitoringLogic instance. /// # Arguments - /// * `deadline_monitors` - A vector of monitor evaluation handles. + /// * `monitors` - A vector of monitor evaluation handles. /// * `supervisor_api_cycle` - Duration between alive notifications to the supervisor. /// * `client` - An implementation of the SupervisorAPIClient trait. pub(super) fn new( - deadline_monitors: FixedCapacityVec>, + monitors: FixedCapacityVec, supervisor_api_cycle: Duration, client: T, ) -> Self { Self { - deadline_monitors, + monitors, client, supervisor_api_cycle, last_notification: Instant::now(), @@ -48,14 +48,20 @@ impl MonitoringLogic { fn run(&mut self) -> bool { let mut has_any_error = false; - // Evaluate deadline monitors. - for monitor in self.deadline_monitors.iter() { + for monitor in self.monitors.iter() { monitor.evaluate(&mut |monitor_tag, error| { has_any_error = true; - warn!( - "Deadline monitor with tag {:?} reported error: {:?}.", - monitor_tag, error - ); + + match error { + MonitorEvaluationError::Deadline(deadline_evaluation_error) => { + warn!( + "Deadline monitor with tag {:?} reported error: {:?}.", + monitor_tag, deadline_evaluation_error + ) + }, + MonitorEvaluationError::Heartbeat => unimplemented!(), + MonitorEvaluationError::Logic => unimplemented!(), + } }); } @@ -151,7 +157,8 @@ impl From for u32 { #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { - use crate::deadline::{DeadlineMonitorBuilder, DeadlineMonitorInner}; + use crate::common::HasEvalHandle; + use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; use crate::protected_memory::ProtectedMemoryAllocator; use crate::supervisor_api_client::SupervisorAPIClient; use crate::tag::{DeadlineTag, MonitorTag}; @@ -185,7 +192,7 @@ mod tests { } } - fn create_monitor_with_deadlines() -> Arc { + fn create_monitor_with_deadlines() -> DeadlineMonitor { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("deadline_monitor"); DeadlineMonitorBuilder::new() @@ -208,7 +215,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.clone()).unwrap(); + vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, Duration::from_secs(1), @@ -234,7 +241,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.clone()).unwrap(); + vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, Duration::from_nanos(0), // Make sure each call notifies alive @@ -263,7 +270,7 @@ mod tests { let mut logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.clone()).unwrap(); + vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, Duration::from_millis(30), @@ -302,7 +309,7 @@ mod tests { let logic = MonitoringLogic::new( { let mut vec = FixedCapacityVec::new(2); - vec.push(deadline_monitor.clone()).unwrap(); + vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, Duration::from_nanos(0), // Make sure each call notifies alive From aa819a8ede58264dc2e9e1af5b0822223a48ffe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Thu, 26 Feb 2026 13:22:52 +0100 Subject: [PATCH 5/5] hmon: rework behavior on panics - Add `HealthMonitorError`. - Not fully utilized in this change, required for future monitors. - Simplify implementation of `start` and `build`. - No longer must be reimplemented for FFI. - New unit tests for `lib.rs`. --- examples/rust_supervised_app/src/main.rs | 5 +- src/health_monitoring_lib/rust/ffi.rs | 92 +++---- src/health_monitoring_lib/rust/lib.rs | 311 +++++++++++++++-------- 3 files changed, 252 insertions(+), 156 deletions(-) diff --git a/examples/rust_supervised_app/src/main.rs b/examples/rust_supervised_app/src/main.rs index f418a30c..fbb16e69 100644 --- a/examples/rust_supervised_app/src/main.rs +++ b/examples/rust_supervised_app/src/main.rs @@ -68,13 +68,14 @@ fn main_logic(args: &Args, stop: Arc) -> Result<(), Box for FFICode { + fn from(value: HealthMonitorError) -> Self { + match value { + HealthMonitorError::NotFound => FFICode::NotFound, + HealthMonitorError::InvalidArgument => FFICode::InvalidArgument, + HealthMonitorError::WrongState => FFICode::WrongState, + } + } +} + /// A wrapper to represent borrowed data over FFI boundary without taking ownership. pub struct FFIBorrowed { data: ManuallyDrop, @@ -115,18 +125,16 @@ pub extern "C" fn health_monitor_builder_build( health_monitor_builder.with_internal_processing_cycle_internal(Duration::from_millis(internal_cycle_ms as u64)); health_monitor_builder.with_supervisor_api_cycle_internal(Duration::from_millis(supervisor_cycle_ms as u64)); - // Check cycle interval args. - if !health_monitor_builder.check_cycle_args_internal() { - return FFICode::InvalidArgument; - } - // Build instance. - let health_monitor = health_monitor_builder.build_internal(); - unsafe { - *health_monitor_handle_out = Box::into_raw(Box::new(health_monitor)).cast(); + match health_monitor_builder.build() { + Ok(health_monitor) => { + unsafe { + *health_monitor_handle_out = Box::into_raw(Box::new(health_monitor)).cast(); + } + FFICode::Success + }, + Err(e) => e.into(), } - - FFICode::Success } #[no_mangle] @@ -206,19 +214,11 @@ pub extern "C" fn health_monitor_start(health_monitor_handle: FFIHandle) -> FFIC // It is assumed that the pointer was not consumed by a call to `health_monitor_destroy`. let mut health_monitor = FFIBorrowed::new(unsafe { Box::from_raw(health_monitor_handle as *mut HealthMonitor) }); - // Check state, collect monitors and start. - if !health_monitor.check_monitors_exist_internal() { - return FFICode::WrongState; + // Start monitoring logic. + match health_monitor.start() { + Ok(_) => FFICode::Success, + Err(error) => error.into(), } - - let monitors = match health_monitor.collect_monitors_internal() { - Ok(m) => m, - Err(_) => return FFICode::WrongState, - }; - - health_monitor.start_internal(monitors); - - FFICode::Success } #[no_mangle] @@ -282,8 +282,16 @@ mod tests { fn health_monitor_builder_build_succeeds() { let mut health_monitor_builder_handle: FFIHandle = null_mut(); let mut health_monitor_handle: FFIHandle = null_mut(); + let mut deadline_monitor_builder_handle = null_mut(); let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let _ = deadline_monitor_builder_create(&mut deadline_monitor_builder_handle as *mut FFIHandle); + let _ = health_monitor_builder_add_deadline_monitor( + health_monitor_builder_handle, + &deadline_monitor_tag as *const MonitorTag, + deadline_monitor_builder_handle, + ); let health_monitor_builder_build_result = health_monitor_builder_build( health_monitor_builder_handle, @@ -319,6 +327,24 @@ mod tests { // Clean-up not needed - health monitor builder was already consumed by the `build`. } + #[test] + fn health_monitor_builder_build_no_monitors() { + let mut health_monitor_builder_handle: FFIHandle = null_mut(); + let mut health_monitor_handle: FFIHandle = null_mut(); + + let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); + + let health_monitor_builder_build_result = health_monitor_builder_build( + health_monitor_builder_handle, + 200, + 100, + &mut health_monitor_handle as *mut FFIHandle, + ); + assert_eq!(health_monitor_builder_build_result, FFICode::WrongState); + + // Clean-up not needed - health monitor builder was already consumed by the `build`. + } + #[test] fn health_monitor_builder_build_null_builder_handle() { let mut health_monitor_handle: FFIHandle = null_mut(); @@ -674,26 +700,6 @@ mod tests { health_monitor_destroy(health_monitor_handle); } - #[test] - fn health_monitor_start_no_monitors() { - let mut health_monitor_builder_handle: FFIHandle = null_mut(); - let mut health_monitor_handle: FFIHandle = null_mut(); - - let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); - let _ = health_monitor_builder_build( - health_monitor_builder_handle, - 200, - 100, - &mut health_monitor_handle as *mut FFIHandle, - ); - - let health_monitor_start_result = health_monitor_start(health_monitor_handle); - assert_eq!(health_monitor_start_result, FFICode::WrongState); - - // Clean-up. - health_monitor_destroy(health_monitor_handle); - } - #[test] fn health_monitor_start_null_hmon() { let health_monitor_start_result = health_monitor_start(null_mut()); diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 610ae31b..72c04098 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -23,12 +23,24 @@ pub mod deadline; use crate::common::{HasEvalHandle, MonitorEvalHandle}; use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; +use crate::log::{error, ScoreDebug}; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; use std::collections::HashMap; pub use tag::{DeadlineTag, MonitorTag}; +/// Health monitor errors. +#[derive(PartialEq, Eq, Debug, ScoreDebug)] +pub enum HealthMonitorError { + /// Requested entry not found. + NotFound, + /// Provided argument is invalid. + InvalidArgument, + /// Current state is invalid. + WrongState, +} + /// Builder for the [`HealthMonitor`]. #[derive(Default)] pub struct HealthMonitorBuilder { @@ -78,13 +90,41 @@ impl HealthMonitorBuilder { } /// Build a new [`HealthMonitor`] instance based on provided parameters. - pub fn build(self) -> HealthMonitor { - assert!( - self.check_cycle_args_internal(), - "supervisor API cycle must be multiple of internal processing cycle" - ); + pub fn build(self) -> Result { + // Check cycle values. + // `supervisor_api_cycle` must be a multiple of `internal_processing_cycle`. + let supervisor_api_cycle_ms = self.supervisor_api_cycle.as_millis() as u64; + let internal_processing_cycle_ms = self.internal_processing_cycle.as_millis() as u64; + if !supervisor_api_cycle_ms.is_multiple_of(internal_processing_cycle_ms) { + error!( + "Supervisor API cycle duration ({} ms) must be a multiple of internal processing cycle interval ({} ms).", + supervisor_api_cycle_ms, internal_processing_cycle_ms + ); + return Err(HealthMonitorError::InvalidArgument); + } + + // Check number of monitors. + let num_monitors = self.deadline_monitor_builders.len(); + if num_monitors == 0 { + error!("No monitors have been added. HealthMonitor cannot be created."); + return Err(HealthMonitorError::WrongState); + } + + // Create allocator. + let allocator = protected_memory::ProtectedMemoryAllocator {}; + + // Create deadline monitors. + let mut deadline_monitors = HashMap::new(); + for (tag, builder) in self.deadline_monitor_builders { + let monitor = builder.build(tag, &allocator); + deadline_monitors.insert(tag, Some(MonitorState::Available(monitor))); + } - self.build_internal() + Ok(HealthMonitor { + deadline_monitors, + worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), + supervisor_api_cycle: self.supervisor_api_cycle, + }) } // Used by FFI and config parsing code which prefer not to move builder instance @@ -104,28 +144,6 @@ impl HealthMonitorBuilder { pub(crate) fn with_internal_processing_cycle_internal(&mut self, cycle_duration: Duration) { self.internal_processing_cycle = cycle_duration; } - - pub(crate) fn check_cycle_args_internal(&self) -> bool { - self.supervisor_api_cycle - .as_millis() - .is_multiple_of(self.internal_processing_cycle.as_millis()) - } - - pub(crate) fn build_internal(self) -> HealthMonitor { - let allocator = protected_memory::ProtectedMemoryAllocator {}; - - // Create deadline monitors. - let mut deadline_monitors = HashMap::new(); - for (tag, builder) in self.deadline_monitor_builders { - deadline_monitors.insert(tag, Some(MonitorState::Available(builder.build(tag, &allocator)))); - } - - HealthMonitor { - deadline_monitors, - worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), - supervisor_api_cycle: self.supervisor_api_cycle, - } - } } /// Monitor ownership state in the [`HealthMonitor`]. @@ -179,76 +197,64 @@ impl HealthMonitor { Self::get_monitor(&mut self.deadline_monitors, monitor_tag) } - /// Start the health monitoring logic in a separate thread. - /// - /// From this point, the health monitor will periodically check monitors and notify the supervisor about system liveness. - /// - /// # Notes - /// - /// This method shall be called before `Lifecycle.running()`. - /// Otherwise the supervisor might consider the process not alive. - /// - /// Health monitoring logic stop when the [`HealthMonitor`] is dropped. - /// - /// # Panics - /// - /// Method panics if no monitors have been added. - pub fn start(&mut self) { - assert!( - self.check_monitors_exist_internal(), - "No monitors have been added. HealthMonitor cannot start without any monitors." - ); - - let monitors = match self.collect_monitors_internal() { - Ok(m) => m, - Err(e) => panic!("{}", e), - }; - - self.start_internal(monitors); - } - - pub(crate) fn check_monitors_exist_internal(&self) -> bool { - !self.deadline_monitors.is_empty() - } - fn collect_given_monitors( monitors_to_collect: &mut HashMap>, collected_monitors: &mut FixedCapacityVec, - ) -> Result<(), String> { + ) -> Result<(), HealthMonitorError> { for (tag, monitor) in monitors_to_collect.iter_mut() { match monitor.take() { Some(MonitorState::Taken(handle)) => { if collected_monitors.push(handle).is_err() { // Should not fail - capacity was preallocated. - return Err("Failed to push monitor handle".to_string()); + error!("Failed to push monitor handle."); + return Err(HealthMonitorError::WrongState); } }, Some(MonitorState::Available(_)) => { - return Err(format!( + error!( "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", tag - )); + ); + return Err(HealthMonitorError::WrongState); }, None => { - return Err(format!( + error!( "Invalid monitor ({:?}) state encountered while starting HealthMonitor.", tag - )); + ); + return Err(HealthMonitorError::WrongState); }, } } Ok(()) } - pub(crate) fn collect_monitors_internal(&mut self) -> Result, String> { - let mut collected_monitors = FixedCapacityVec::new(self.deadline_monitors.len()); + /// Start the health monitoring logic in a separate thread. + /// + /// From this point, the health monitor will periodically check monitors and notify the supervisor about system liveness. + /// + /// # Notes + /// + /// This method shall be called before `Lifecycle.running()`. + /// Otherwise the supervisor might consider the process not alive. + /// + /// Health monitoring logic stop when the [`HealthMonitor`] is dropped. + pub fn start(&mut self) -> Result<(), HealthMonitorError> { + // Check number of monitors. + // Should never occur if created by `HealthMonitorBuilder`! + let num_monitors = self.deadline_monitors.len(); + if num_monitors == 0 { + error!("No monitors have been added. HealthMonitor cannot be created."); + return Err(HealthMonitorError::WrongState); + } + + // Collect all monitors. + let mut collected_monitors = FixedCapacityVec::new(num_monitors); Self::collect_given_monitors(&mut self.deadline_monitors, &mut collected_monitors)?; - Ok(collected_monitors) - } - pub(crate) fn start_internal(&mut self, monitors: FixedCapacityVec) { + // Start monitoring logic. let monitoring_logic = worker::MonitoringLogic::new( - monitors, + collected_monitors, self.supervisor_api_cycle, #[cfg(all(not(test), feature = "score_supervisor_api_client"))] supervisor_api_client::score_supervisor_api_client::ScoreSupervisorAPIClient::new(), @@ -259,7 +265,8 @@ impl HealthMonitor { supervisor_api_client::stub_supervisor_api_client::StubSupervisorAPIClient::new(), ); - self.worker.start(monitoring_logic) + self.worker.start(monitoring_logic); + Ok(()) } //TODO: Add possibility to run HM in the current thread - ie in main @@ -268,65 +275,147 @@ impl HealthMonitor { #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { - use super::*; + use crate::deadline::DeadlineMonitorBuilder; + use crate::tag::MonitorTag; + use crate::{HealthMonitorBuilder, HealthMonitorError}; + use core::time::Duration; + use std::collections::HashMap; #[test] - #[should_panic(expected = "No monitors have been added. HealthMonitor cannot start without any monitors.")] - fn hm_with_no_monitors_shall_panic_on_start() { - let health_monitor_builder = super::HealthMonitorBuilder::new(); - health_monitor_builder.build().start(); + fn health_monitor_builder_new_succeeds() { + let health_monitor_builder = HealthMonitorBuilder::new(); + assert!(health_monitor_builder.deadline_monitor_builders.is_empty()); + assert_eq!(health_monitor_builder.supervisor_api_cycle, Duration::from_millis(500)); + assert_eq!( + health_monitor_builder.internal_processing_cycle, + Duration::from_millis(100) + ); } #[test] - #[should_panic(expected = "supervisor API cycle must be multiple of internal processing cycle")] - fn hm_with_wrong_cycle_fails_to_build() { - super::HealthMonitorBuilder::new() - .with_supervisor_api_cycle(Duration::from_millis(50)) + fn health_monitor_builder_build_succeeds() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let result = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) .build(); + assert!(result.is_ok()); } #[test] - fn hm_with_taken_monitors_starts() { - let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) + fn health_monitor_builder_build_invalid_cycles() { + let result = HealthMonitorBuilder::new() + .with_supervisor_api_cycle(Duration::from_millis(123)) + .with_internal_processing_cycle(Duration::from_millis(100)) .build(); + assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); + } - let _monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - health_monitor.start(); + #[test] + fn health_monitor_builder_build_no_monitors() { + let result = HealthMonitorBuilder::new().build(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); } #[test] - #[should_panic( - expected = "All monitors must be taken before starting HealthMonitor but MonitorTag(test_monitor) is not taken." - )] - fn hm_with_monitors_shall_not_start_with_not_taken_monitors() { + fn health_monitor_get_deadline_monitor_available() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) - .build(); + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); - health_monitor.start(); + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_some()); } #[test] - fn hm_get_deadline_monitor_works() { + fn health_monitor_get_deadline_monitor_taken() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor(MonitorTag::from("test_monitor"), DeadlineMonitorBuilder::new()) - .build(); + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); - { - let monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - assert!( - monitor.is_some(), - "Expected to retrieve the deadline monitor, but got None" - ); - } + let _ = health_monitor.get_deadline_monitor(deadline_monitor_tag); + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_none()); + } - { - let monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - assert!( - monitor.is_none(), - "Expected None when retrieving the monitor a second time, but got Some" - ); - } + #[test] + fn health_monitor_get_deadline_monitor_unknown() { + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.get_deadline_monitor(MonitorTag::from("undefined_monitor")); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_get_deadline_monitor_invalid_state() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + // Inject broken state - unreachable otherwise. + health_monitor.deadline_monitors.insert(deadline_monitor_tag, None); + + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_start_succeeds() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + let _deadline_monitor = health_monitor.get_deadline_monitor(deadline_monitor_tag).unwrap(); + + let result = health_monitor.start(); + assert!(result.is_ok()); + } + + #[test] + fn health_monitor_start_monitors_not_taken() { + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.start(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); + } + + #[test] + fn health_monitor_start_no_monitors() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + // Inject broken state - unreachable otherwise. + health_monitor.deadline_monitors = HashMap::new(); + + let result = health_monitor.start(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); } }