diff --git a/Cargo.toml b/Cargo.toml index 4c19b6ec..a4f364e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "src/health_monitoring_lib", "examples/rust_supervised_app", ] +default-members = ["src/health_monitoring_lib"] [workspace.package] edition = "2021" diff --git a/examples/rust_supervised_app/src/main.rs b/examples/rust_supervised_app/src/main.rs index f418a30c..fbb16e69 100644 --- a/examples/rust_supervised_app/src/main.rs +++ b/examples/rust_supervised_app/src/main.rs @@ -68,13 +68,14 @@ fn main_logic(args: &Args, stop: Arc) -> Result<(), Box MonitorEvalHandle; +} + /// Errors that can occur during monitor evaluation. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, crate::log::ScoreDebug)] +/// Contains failing monitor type. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, ScoreDebug)] +#[allow(dead_code)] pub(crate) enum MonitorEvaluationError { - TooEarly, - TooLate, + Deadline(DeadlineEvaluationError), + Heartbeat, + Logic, +} + +impl From for MonitorEvaluationError { + fn from(value: DeadlineEvaluationError) -> Self { + MonitorEvaluationError::Deadline(value) + } } /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. pub(crate) trait MonitorEvaluator { + /// Run monitor evaluation. + /// + /// - `on_error` - error handling, containing tag of failing object and error code. fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)); } diff --git a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs index 995b851d..24413079 100644 --- a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs +++ b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs @@ -10,20 +10,26 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use super::common::DeadlineTemplate; -use crate::common::{MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, TimeRange}; +use crate::common::{HasEvalHandle, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; +use crate::deadline::common::{DeadlineTemplate, StateIndex}; +use crate::deadline::deadline_state::{DeadlineState, DeadlineStateSnapshot}; +use crate::log::{error, warn, ScoreDebug}; +use crate::protected_memory::ProtectedMemoryAllocator; use crate::tag::{DeadlineTag, MonitorTag}; -use crate::{ - deadline::{ - common::StateIndex, - deadline_state::{DeadlineState, DeadlineStateSnapshot}, - }, - protected_memory::ProtectedMemoryAllocator, -}; +use crate::TimeRange; use core::hash::Hash; -use std::{collections::HashMap, sync::Arc, time::Instant}; - -use crate::log::*; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +/// Deadline evaluation errors. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, ScoreDebug)] +pub(crate) enum DeadlineEvaluationError { + /// Finished too early. + TooEarly, + /// Finished too late. + TooLate, +} /// /// Errors that can occur when working with DeadlineMonitor @@ -65,7 +71,8 @@ impl DeadlineMonitorBuilder { /// Builds the DeadlineMonitor with the configured deadlines. pub(crate) fn build(self, monitor_tag: MonitorTag, _allocator: &ProtectedMemoryAllocator) -> DeadlineMonitor { - DeadlineMonitor::new(monitor_tag, self.deadlines) + let inner = Arc::new(DeadlineMonitorInner::new(monitor_tag, self.deadlines)); + DeadlineMonitor::new(inner) } // Used by FFI and config parsing code which prefer not to move builder instance @@ -80,27 +87,9 @@ pub struct DeadlineMonitor { } impl DeadlineMonitor { - fn new(monitor_tag: MonitorTag, deadlines: HashMap) -> Self { - let mut active_deadlines = vec![]; - - let deadlines = deadlines - .into_iter() - .enumerate() - .map(|(index, (deadline_tag, range))| { - active_deadlines.push((deadline_tag, DeadlineState::new())); - (deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index))) - }) - .collect(); - - Self { - #[allow(clippy::arc_with_non_send_sync)] // This will be fixed once we add background thread - inner: Arc::new(DeadlineMonitorInner { - monitor_tag, - deadlines, - active_deadlines: active_deadlines.into(), - monitor_starting_point: Instant::now(), - }), - } + /// Create a new [`DeadlineMonitor`] instance. + fn new(inner: Arc) -> Self { + Self { inner } } /// Acquires a deadline instance for the given tag. @@ -109,26 +98,12 @@ impl DeadlineMonitor { /// - Err(DeadlineMonitorError::DeadlineInUse) - if the deadline is already in use /// - Err(DeadlineMonitorError::DeadlineNotFound) - if the deadline tag is not registered pub fn get_deadline(&self, deadline_tag: DeadlineTag) -> Result { - if let Some(template) = self.inner.deadlines.get(&deadline_tag) { - match template.acquire_deadline() { - Some(range) => Ok(Deadline { - range, - deadline_tag, - monitor: Arc::clone(&self.inner), - state_index: template.assigned_state_index, - }), - None => Err(DeadlineMonitorError::DeadlineInUse), - } - } else { - Err(DeadlineMonitorError::DeadlineNotFound) - } + self.inner.get_deadline(deadline_tag) } +} - /// Handle for evaluation of all active deadlines and reporting any missed deadlines or underruns. - /// - /// # NOTE - /// This function is intended to be called from a background thread periodically. - pub(crate) fn get_eval_handle(&self) -> MonitorEvalHandle { +impl HasEvalHandle for DeadlineMonitor { + fn get_eval_handle(&self) -> MonitorEvalHandle { MonitorEvalHandle::new(Arc::clone(&self.inner)) } } @@ -220,7 +195,7 @@ impl Deadline { let expected = current.timestamp_ms(); if expected < now { - possible_err = (Some(MonitorEvaluationError::TooLate), now - expected); + possible_err = (Some(DeadlineEvaluationError::TooLate), now - expected); return None; // Deadline missed, let state as is for BG thread to report } @@ -231,7 +206,7 @@ impl Deadline { // Finished too early, leave it for reporting by BG thread current.set_underrun(); - possible_err = (Some(MonitorEvaluationError::TooEarly), earliest_time - now); + possible_err = (Some(DeadlineEvaluationError::TooEarly), earliest_time - now); return Some(current); } @@ -239,10 +214,10 @@ impl Deadline { }); match possible_err { - (Some(MonitorEvaluationError::TooEarly), val) => { + (Some(DeadlineEvaluationError::TooEarly), val) => { error!("Deadline {:?} stopped too early by {} ms", self.deadline_tag, val); }, - (Some(MonitorEvaluationError::TooLate), val) => { + (Some(DeadlineEvaluationError::TooLate), val) => { error!("Deadline {:?} stopped too late by {} ms", self.deadline_tag, val); }, (None, _) => {}, @@ -286,27 +261,6 @@ struct DeadlineMonitorInner { impl MonitorEvaluator for DeadlineMonitorInner { fn evaluate(&self, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { - self.evaluate(on_error); - } -} - -impl DeadlineMonitorInner { - fn release_deadline(&self, deadline_tag: DeadlineTag) { - if let Some(template) = self.deadlines.get(&deadline_tag) { - template.release_deadline(); - } else { - unreachable!("Releasing unknown deadline tag: {:?}", deadline_tag); - } - } - - fn now(&self) -> u32 { - let duration = self.monitor_starting_point.elapsed(); - // As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case - // We still have a room up to 60bits timestamp if needed in future - u32::try_from(duration.as_millis()).expect("Monitor running for too long") - } - - fn evaluate(&self, mut on_failed: impl FnMut(&MonitorTag, MonitorEvaluationError)) { for (deadline_tag, deadline) in self.active_deadlines.iter() { let snapshot = deadline.snapshot(); if snapshot.is_underrun() { @@ -314,7 +268,7 @@ impl DeadlineMonitorInner { warn!("Deadline ({:?}) finished too early!", deadline_tag); // Here we would normally report the underrun to the monitoring system - on_failed(&self.monitor_tag, MonitorEvaluationError::TooEarly); + on_error(&self.monitor_tag, DeadlineEvaluationError::TooEarly.into()); } else if snapshot.is_running() { debug_assert!( snapshot.is_stopped(), @@ -331,13 +285,67 @@ impl DeadlineMonitorInner { ); // Here we would normally report the missed deadline to the monitoring system - on_failed(&self.monitor_tag, MonitorEvaluationError::TooLate); + on_error(&self.monitor_tag, DeadlineEvaluationError::TooLate.into()); } } } } } +impl DeadlineMonitorInner { + fn new(monitor_tag: MonitorTag, deadlines: HashMap) -> Self { + let mut active_deadlines = vec![]; + + let deadlines = deadlines + .into_iter() + .enumerate() + .map(|(index, (deadline_tag, range))| { + active_deadlines.push((deadline_tag, DeadlineState::new())); + (deadline_tag, DeadlineTemplate::new(range, StateIndex::new(index))) + }) + .collect(); + + #[allow(clippy::arc_with_non_send_sync)] // This will be fixed once we add background thread + Self { + monitor_tag, + deadlines, + active_deadlines: active_deadlines.into(), + monitor_starting_point: Instant::now(), + } + } + + fn release_deadline(&self, deadline_tag: DeadlineTag) { + if let Some(template) = self.deadlines.get(&deadline_tag) { + template.release_deadline(); + } else { + unreachable!("Releasing unknown deadline tag: {:?}", deadline_tag); + } + } + + pub(crate) fn get_deadline(self: &Arc, deadline_tag: DeadlineTag) -> Result { + if let Some(template) = self.deadlines.get(&deadline_tag) { + match template.acquire_deadline() { + Some(range) => Ok(Deadline { + range, + deadline_tag, + monitor: self.clone(), + state_index: template.assigned_state_index, + }), + None => Err(DeadlineMonitorError::DeadlineInUse), + } + } else { + Err(DeadlineMonitorError::DeadlineNotFound) + } + } + + fn now(&self) -> u32 { + let duration = self.monitor_starting_point.elapsed(); + // As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case + // We still have a room up to 60bits timestamp if needed in future + u32::try_from(duration.as_millis()).expect("Monitor running for too long") + } +} + #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { @@ -410,7 +418,7 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { panic!( "Deadline {:?} should not have failed or underrun({:?})", monitor_tag, deadline_failure @@ -426,10 +434,10 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -444,10 +452,10 @@ mod tests { // So deadline stop happens after evaluate, still it should be reported as failed - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -470,10 +478,10 @@ mod tests { let handle = deadline.start(); assert_eq!(handle.err(), Some(DeadlineError::DeadlineAlreadyFailed)); - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooEarly, + DeadlineEvaluationError::TooEarly.into(), "Deadline {:?} should not have failed ({:?})", monitor_tag, deadline_failure @@ -489,10 +497,10 @@ mod tests { drop(handle); // stop the deadline - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { assert_eq!( deadline_failure, - MonitorEvaluationError::TooLate, + DeadlineEvaluationError::TooLate.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure @@ -517,11 +525,11 @@ mod tests { let mut cnt = 0; - monitor.inner.evaluate(|monitor_tag, deadline_failure| { + monitor.inner.evaluate(&mut |monitor_tag, deadline_failure| { cnt += 1; assert_eq!( deadline_failure, - MonitorEvaluationError::TooLate, + DeadlineEvaluationError::TooLate.into(), "Deadline {:?} should not have failed({:?})", monitor_tag, deadline_failure diff --git a/src/health_monitoring_lib/rust/deadline/mod.rs b/src/health_monitoring_lib/rust/deadline/mod.rs index d903c412..7444ce3b 100644 --- a/src/health_monitoring_lib/rust/deadline/mod.rs +++ b/src/health_monitoring_lib/rust/deadline/mod.rs @@ -15,6 +15,7 @@ mod common; mod deadline_monitor; mod deadline_state; +pub(crate) use deadline_monitor::DeadlineEvaluationError; pub use deadline_monitor::{ DeadlineError, DeadlineHandle, DeadlineMonitor, DeadlineMonitorBuilder, DeadlineMonitorError, }; diff --git a/src/health_monitoring_lib/rust/ffi.rs b/src/health_monitoring_lib/rust/ffi.rs index f08b4c08..7e264478 100644 --- a/src/health_monitoring_lib/rust/ffi.rs +++ b/src/health_monitoring_lib/rust/ffi.rs @@ -13,7 +13,7 @@ use crate::deadline::ffi::DeadlineMonitorCpp; use crate::deadline::DeadlineMonitorBuilder; use crate::tag::MonitorTag; -use crate::{HealthMonitor, HealthMonitorBuilder}; +use crate::{HealthMonitor, HealthMonitorBuilder, HealthMonitorError}; use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::time::Duration; @@ -36,6 +36,16 @@ pub enum FFICode { Failed, } +impl From for FFICode { + fn from(value: HealthMonitorError) -> Self { + match value { + HealthMonitorError::NotFound => FFICode::NotFound, + HealthMonitorError::InvalidArgument => FFICode::InvalidArgument, + HealthMonitorError::WrongState => FFICode::WrongState, + } + } +} + /// A wrapper to represent borrowed data over FFI boundary without taking ownership. pub struct FFIBorrowed { data: ManuallyDrop, @@ -115,18 +125,16 @@ pub extern "C" fn health_monitor_builder_build( health_monitor_builder.with_internal_processing_cycle_internal(Duration::from_millis(internal_cycle_ms as u64)); health_monitor_builder.with_supervisor_api_cycle_internal(Duration::from_millis(supervisor_cycle_ms as u64)); - // Check cycle interval args. - if !health_monitor_builder.check_cycle_args_internal() { - return FFICode::InvalidArgument; - } - // Build instance. - let health_monitor = health_monitor_builder.build_internal(); - unsafe { - *health_monitor_handle_out = Box::into_raw(Box::new(health_monitor)).cast(); + match health_monitor_builder.build() { + Ok(health_monitor) => { + unsafe { + *health_monitor_handle_out = Box::into_raw(Box::new(health_monitor)).cast(); + } + FFICode::Success + }, + Err(e) => e.into(), } - - FFICode::Success } #[no_mangle] @@ -206,19 +214,11 @@ pub extern "C" fn health_monitor_start(health_monitor_handle: FFIHandle) -> FFIC // It is assumed that the pointer was not consumed by a call to `health_monitor_destroy`. let mut health_monitor = FFIBorrowed::new(unsafe { Box::from_raw(health_monitor_handle as *mut HealthMonitor) }); - // Check state, collect monitors and start. - if !health_monitor.check_monitors_exist_internal() { - return FFICode::WrongState; + // Start monitoring logic. + match health_monitor.start() { + Ok(_) => FFICode::Success, + Err(error) => error.into(), } - - let monitors = match health_monitor.collect_monitors_internal() { - Ok(m) => m, - Err(_) => return FFICode::WrongState, - }; - - health_monitor.start_internal(monitors); - - FFICode::Success } #[no_mangle] @@ -282,8 +282,16 @@ mod tests { fn health_monitor_builder_build_succeeds() { let mut health_monitor_builder_handle: FFIHandle = null_mut(); let mut health_monitor_handle: FFIHandle = null_mut(); + let mut deadline_monitor_builder_handle = null_mut(); let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let _ = deadline_monitor_builder_create(&mut deadline_monitor_builder_handle as *mut FFIHandle); + let _ = health_monitor_builder_add_deadline_monitor( + health_monitor_builder_handle, + &deadline_monitor_tag as *const MonitorTag, + deadline_monitor_builder_handle, + ); let health_monitor_builder_build_result = health_monitor_builder_build( health_monitor_builder_handle, @@ -319,6 +327,24 @@ mod tests { // Clean-up not needed - health monitor builder was already consumed by the `build`. } + #[test] + fn health_monitor_builder_build_no_monitors() { + let mut health_monitor_builder_handle: FFIHandle = null_mut(); + let mut health_monitor_handle: FFIHandle = null_mut(); + + let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); + + let health_monitor_builder_build_result = health_monitor_builder_build( + health_monitor_builder_handle, + 200, + 100, + &mut health_monitor_handle as *mut FFIHandle, + ); + assert_eq!(health_monitor_builder_build_result, FFICode::WrongState); + + // Clean-up not needed - health monitor builder was already consumed by the `build`. + } + #[test] fn health_monitor_builder_build_null_builder_handle() { let mut health_monitor_handle: FFIHandle = null_mut(); @@ -674,26 +700,6 @@ mod tests { health_monitor_destroy(health_monitor_handle); } - #[test] - fn health_monitor_start_no_monitors() { - let mut health_monitor_builder_handle: FFIHandle = null_mut(); - let mut health_monitor_handle: FFIHandle = null_mut(); - - let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); - let _ = health_monitor_builder_build( - health_monitor_builder_handle, - 200, - 100, - &mut health_monitor_handle as *mut FFIHandle, - ); - - let health_monitor_start_result = health_monitor_start(health_monitor_handle); - assert_eq!(health_monitor_start_result, FFICode::WrongState); - - // Clean-up. - health_monitor_destroy(health_monitor_handle); - } - #[test] fn health_monitor_start_null_hmon() { let health_monitor_start_result = health_monitor_start(null_mut()); diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 492e6378..72c04098 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -15,21 +15,36 @@ mod common; mod ffi; mod log; mod protected_memory; +mod supervisor_api_client; mod tag; mod worker; pub mod deadline; -use crate::common::MonitorEvalHandle; +use crate::common::{HasEvalHandle, MonitorEvalHandle}; +use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; +use crate::log::{error, ScoreDebug}; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; use std::collections::HashMap; pub use tag::{DeadlineTag, MonitorTag}; +/// Health monitor errors. +#[derive(PartialEq, Eq, Debug, ScoreDebug)] +pub enum HealthMonitorError { + /// Requested entry not found. + NotFound, + /// Provided argument is invalid. + InvalidArgument, + /// Current state is invalid. + WrongState, +} + +/// Builder for the [`HealthMonitor`]. #[derive(Default)] pub struct HealthMonitorBuilder { - deadline_monitor_builders: HashMap, + deadline_monitor_builders: HashMap, supervisor_api_cycle: Duration, internal_processing_cycle: Duration, } @@ -44,39 +59,72 @@ impl HealthMonitorBuilder { } } - /// Adds a deadline monitor for a specific identifier tag. - /// # Arguments - /// * `monitor_tag` - The unique identifier for the deadline monitor. - /// * `monitor` - The builder for the deadline monitor. + /// Add a [`DeadlineMonitor`] for the given [`MonitorTag`]. + /// + /// - `monitor_tag` - unique tag for the [`DeadlineMonitor`]. + /// - `monitor_builder` - monitor builder to finalize. + /// /// # Note - /// If a monitor with the same tag already exists, it will be overwritten. - pub fn add_deadline_monitor(mut self, monitor_tag: MonitorTag, monitor: deadline::DeadlineMonitorBuilder) -> Self { - self.add_deadline_monitor_internal(monitor_tag, monitor); + /// + /// If a deadline monitor with the same tag already exists, it will be overwritten. + pub fn add_deadline_monitor(mut self, monitor_tag: MonitorTag, monitor_builder: DeadlineMonitorBuilder) -> Self { + self.add_deadline_monitor_internal(monitor_tag, monitor_builder); self } - /// Sets the cycle duration for supervisor API notifications. - /// This duration determines how often the health monitor notifies the supervisor that the system is alive. + /// Set the interval between supervisor API notifications. + /// This duration determines how often the health monitor notifies the supervisor about system liveness. + /// + /// - `cycle_duration` - interval between notifications. pub fn with_supervisor_api_cycle(mut self, cycle_duration: Duration) -> Self { self.with_supervisor_api_cycle_internal(cycle_duration); self } - /// Sets the internal processing cycle duration. - /// This duration determines how often the health monitor checks deadlines. + /// Set the internal interval between health monitor evaluations. + /// + /// - `cycle_duration` - interval between evaluations. pub fn with_internal_processing_cycle(mut self, cycle_duration: Duration) -> Self { self.with_internal_processing_cycle_internal(cycle_duration); self } /// Build a new [`HealthMonitor`] instance based on provided parameters. - pub fn build(self) -> HealthMonitor { - assert!( - self.check_cycle_args_internal(), - "supervisor API cycle must be multiple of internal processing cycle" - ); + pub fn build(self) -> Result { + // Check cycle values. + // `supervisor_api_cycle` must be a multiple of `internal_processing_cycle`. + let supervisor_api_cycle_ms = self.supervisor_api_cycle.as_millis() as u64; + let internal_processing_cycle_ms = self.internal_processing_cycle.as_millis() as u64; + if !supervisor_api_cycle_ms.is_multiple_of(internal_processing_cycle_ms) { + error!( + "Supervisor API cycle duration ({} ms) must be a multiple of internal processing cycle interval ({} ms).", + supervisor_api_cycle_ms, internal_processing_cycle_ms + ); + return Err(HealthMonitorError::InvalidArgument); + } - self.build_internal() + // Check number of monitors. + let num_monitors = self.deadline_monitor_builders.len(); + if num_monitors == 0 { + error!("No monitors have been added. HealthMonitor cannot be created."); + return Err(HealthMonitorError::WrongState); + } + + // Create allocator. + let allocator = protected_memory::ProtectedMemoryAllocator {}; + + // Create deadline monitors. + let mut deadline_monitors = HashMap::new(); + for (tag, builder) in self.deadline_monitor_builders { + let monitor = builder.build(tag, &allocator); + deadline_monitors.insert(tag, Some(MonitorState::Available(monitor))); + } + + Ok(HealthMonitor { + deadline_monitors, + worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), + supervisor_api_cycle: self.supervisor_api_cycle, + }) } // Used by FFI and config parsing code which prefer not to move builder instance @@ -84,9 +132,9 @@ impl HealthMonitorBuilder { pub(crate) fn add_deadline_monitor_internal( &mut self, monitor_tag: MonitorTag, - monitor: deadline::DeadlineMonitorBuilder, + monitor_builder: DeadlineMonitorBuilder, ) { - self.deadline_monitor_builders.insert(monitor_tag, monitor); + self.deadline_monitor_builders.insert(monitor_tag, monitor_builder); } pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: Duration) { @@ -96,136 +144,129 @@ impl HealthMonitorBuilder { pub(crate) fn with_internal_processing_cycle_internal(&mut self, cycle_duration: Duration) { self.internal_processing_cycle = cycle_duration; } - - pub(crate) fn check_cycle_args_internal(&self) -> bool { - self.supervisor_api_cycle - .as_millis() - .is_multiple_of(self.internal_processing_cycle.as_millis()) - } - - pub(crate) fn build_internal(self) -> HealthMonitor { - let allocator = protected_memory::ProtectedMemoryAllocator {}; - - // Create deadline monitors. - let mut deadline_monitors = HashMap::new(); - for (tag, builder) in self.deadline_monitor_builders { - deadline_monitors.insert( - tag, - Some(DeadlineMonitorState::Available(builder.build(tag, &allocator))), - ); - } - - HealthMonitor { - deadline_monitors, - worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), - supervisor_api_cycle: self.supervisor_api_cycle, - } - } } -enum DeadlineMonitorState { - Available(deadline::DeadlineMonitor), - Taken(common::MonitorEvalHandle), +/// Monitor ownership state in the [`HealthMonitor`]. +enum MonitorState { + /// Monitor is available. + Available(Monitor), + /// Monitor is already taken. + Taken(MonitorEvalHandle), } +/// Monitor container. +/// - Must be an option to ensure monitor can be taken out (not referenced). +/// - Must be an enum to ensure evaluation handle is still available for HMON after monitor is taken. +type MonitorContainer = Option>; + +/// Health monitor. pub struct HealthMonitor { - deadline_monitors: HashMap>, + deadline_monitors: HashMap>, worker: worker::UniqueThreadRunner, supervisor_api_cycle: Duration, } impl HealthMonitor { - /// Retrieves and removes (hand over to user) a deadline monitor associated with the given identifier tag. - /// # Arguments - /// * `monitor_tag` - The unique identifier for the deadline monitor. - /// # Returns - /// An Option containing the DeadlineMonitor if found, or None if - /// - no monitor exists for the given tag or was already obtained - /// - pub fn get_deadline_monitor(&mut self, monitor_tag: MonitorTag) -> Option { - let monitor = self.deadline_monitors.get_mut(&monitor_tag)?; - - match monitor.take() { - Some(DeadlineMonitorState::Available(deadline_monitor)) => { - monitor.replace(DeadlineMonitorState::Taken(deadline_monitor.get_eval_handle())); + fn get_monitor( + monitors: &mut HashMap>, + monitor_tag: MonitorTag, + ) -> Option { + let monitor_state = monitors.get_mut(&monitor_tag)?; - Some(deadline_monitor) + match monitor_state.take() { + Some(MonitorState::Available(monitor)) => { + monitor_state.replace(MonitorState::Taken(monitor.get_eval_handle())); + Some(monitor) }, - Some(DeadlineMonitorState::Taken(v)) => { - monitor.replace(DeadlineMonitorState::Taken(v)); // Insert back + Some(MonitorState::Taken(handle)) => { + // Taken handle is inserted back. + monitor_state.replace(MonitorState::Taken(handle)); None }, None => None, } } - /// Starts the health monitoring logic in a separate thread. - /// - /// From this point, the health monitor will periodically check monitors and notify the supervisor about system liveness. + /// Get and pass ownership of a [`DeadlineMonitor`] for the given [`MonitorTag`]. /// - /// # Note - /// - This function shall be called before Lifecycle.running() otherwise the supervisor might consider the process not alive. - /// - Stops when the HealthMonitor instance is dropped. + /// - `monitor_tag` - unique tag for the [`DeadlineMonitor`]. /// - /// Panics if no monitors have been added. - pub fn start(&mut self) { - assert!( - self.check_monitors_exist_internal(), - "No deadline monitors have been added. HealthMonitor cannot start without any monitors." - ); - - let monitors = match self.collect_monitors_internal() { - Ok(m) => m, - Err(e) => panic!("{}", e), - }; - - self.start_internal(monitors); - } - - pub(crate) fn check_monitors_exist_internal(&self) -> bool { - !self.deadline_monitors.is_empty() + /// Returns [`Some`] containing [`DeadlineMonitor`] if found and not taken. + /// Otherwise returns [`None`]. + pub fn get_deadline_monitor(&mut self, monitor_tag: MonitorTag) -> Option { + Self::get_monitor(&mut self.deadline_monitors, monitor_tag) } - pub(crate) fn collect_monitors_internal(&mut self) -> Result, String> { - let mut monitors = FixedCapacityVec::new(self.deadline_monitors.len()); - for (tag, monitor) in self.deadline_monitors.iter_mut() { + fn collect_given_monitors( + monitors_to_collect: &mut HashMap>, + collected_monitors: &mut FixedCapacityVec, + ) -> Result<(), HealthMonitorError> { + for (tag, monitor) in monitors_to_collect.iter_mut() { match monitor.take() { - Some(DeadlineMonitorState::Taken(handle)) => { - if monitors.push(handle).is_err() { - // Should not fail since we preallocated enough capacity - return Err("Failed to push monitor handle".to_string()); + Some(MonitorState::Taken(handle)) => { + if collected_monitors.push(handle).is_err() { + // Should not fail - capacity was preallocated. + error!("Failed to push monitor handle."); + return Err(HealthMonitorError::WrongState); } }, - Some(DeadlineMonitorState::Available(_)) => { - return Err(format!( + Some(MonitorState::Available(_)) => { + error!( "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", tag - )); + ); + return Err(HealthMonitorError::WrongState); }, None => { - return Err(format!( + error!( "Invalid monitor ({:?}) state encountered while starting HealthMonitor.", tag - )); + ); + return Err(HealthMonitorError::WrongState); }, } } - Ok(monitors) + Ok(()) } - pub(crate) fn start_internal(&mut self, monitors: FixedCapacityVec) { + /// Start the health monitoring logic in a separate thread. + /// + /// From this point, the health monitor will periodically check monitors and notify the supervisor about system liveness. + /// + /// # Notes + /// + /// This method shall be called before `Lifecycle.running()`. + /// Otherwise the supervisor might consider the process not alive. + /// + /// Health monitoring logic stop when the [`HealthMonitor`] is dropped. + pub fn start(&mut self) -> Result<(), HealthMonitorError> { + // Check number of monitors. + // Should never occur if created by `HealthMonitorBuilder`! + let num_monitors = self.deadline_monitors.len(); + if num_monitors == 0 { + error!("No monitors have been added. HealthMonitor cannot be created."); + return Err(HealthMonitorError::WrongState); + } + + // Collect all monitors. + let mut collected_monitors = FixedCapacityVec::new(num_monitors); + Self::collect_given_monitors(&mut self.deadline_monitors, &mut collected_monitors)?; + + // Start monitoring logic. let monitoring_logic = worker::MonitoringLogic::new( - monitors, + collected_monitors, self.supervisor_api_cycle, - // Currently only `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. - // The later is meant to be used for testing purposes. - #[cfg(not(any(test, feature = "stub_supervisor_api_client")))] - worker::ScoreSupervisorAPIClient::new(), - #[cfg(any(test, feature = "stub_supervisor_api_client"))] - worker::StubSupervisorAPIClient {}, + #[cfg(all(not(test), feature = "score_supervisor_api_client"))] + supervisor_api_client::score_supervisor_api_client::ScoreSupervisorAPIClient::new(), + #[cfg(any( + test, + all(feature = "stub_supervisor_api_client", not(feature = "score_supervisor_api_client")) + ))] + supervisor_api_client::stub_supervisor_api_client::StubSupervisorAPIClient::new(), ); - self.worker.start(monitoring_logic) + self.worker.start(monitoring_logic); + Ok(()) } //TODO: Add possibility to run HM in the current thread - ie in main @@ -234,74 +275,147 @@ impl HealthMonitor { #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { - use super::*; + use crate::deadline::DeadlineMonitorBuilder; + use crate::tag::MonitorTag; + use crate::{HealthMonitorBuilder, HealthMonitorError}; + use core::time::Duration; + use std::collections::HashMap; #[test] - #[should_panic(expected = "No deadline monitors have been added. HealthMonitor cannot start without any monitors.")] - fn hm_with_no_monitors_shall_panic_on_start() { - let health_monitor_builder = super::HealthMonitorBuilder::new(); - health_monitor_builder.build().start(); + fn health_monitor_builder_new_succeeds() { + let health_monitor_builder = HealthMonitorBuilder::new(); + assert!(health_monitor_builder.deadline_monitor_builders.is_empty()); + assert_eq!(health_monitor_builder.supervisor_api_cycle, Duration::from_millis(500)); + assert_eq!( + health_monitor_builder.internal_processing_cycle, + Duration::from_millis(100) + ); } #[test] - #[should_panic(expected = "supervisor API cycle must be multiple of internal processing cycle")] - fn hm_with_wrong_cycle_fails_to_build() { - super::HealthMonitorBuilder::new() - .with_supervisor_api_cycle(Duration::from_millis(50)) + fn health_monitor_builder_build_succeeds() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let result = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) .build(); + assert!(result.is_ok()); } #[test] - fn hm_with_taken_monitors_starts() { - let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) + fn health_monitor_builder_build_invalid_cycles() { + let result = HealthMonitorBuilder::new() + .with_supervisor_api_cycle(Duration::from_millis(123)) + .with_internal_processing_cycle(Duration::from_millis(100)) .build(); + assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); + } - let _monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - health_monitor.start(); + #[test] + fn health_monitor_builder_build_no_monitors() { + let result = HealthMonitorBuilder::new().build(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); } #[test] - #[should_panic( - expected = "All monitors must be taken before starting HealthMonitor but MonitorTag(test_monitor) is not taken." - )] - fn hm_with_monitors_shall_not_start_with_not_taken_monitors() { + fn health_monitor_get_deadline_monitor_available() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) - .build(); + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); - health_monitor.start(); + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_some()); } #[test] - fn hm_get_deadline_monitor_works() { + fn health_monitor_get_deadline_monitor_taken() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let mut health_monitor = HealthMonitorBuilder::new() - .add_deadline_monitor( - MonitorTag::from("test_monitor"), - deadline::DeadlineMonitorBuilder::new(), - ) - .build(); + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); - { - let monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - assert!( - monitor.is_some(), - "Expected to retrieve the deadline monitor, but got None" - ); - } + let _ = health_monitor.get_deadline_monitor(deadline_monitor_tag); + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_none()); + } - { - let monitor = health_monitor.get_deadline_monitor(MonitorTag::from("test_monitor")); - assert!( - monitor.is_none(), - "Expected None when retrieving the monitor a second time, but got Some" - ); - } + #[test] + fn health_monitor_get_deadline_monitor_unknown() { + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.get_deadline_monitor(MonitorTag::from("undefined_monitor")); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_get_deadline_monitor_invalid_state() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + // Inject broken state - unreachable otherwise. + health_monitor.deadline_monitors.insert(deadline_monitor_tag, None); + + let result = health_monitor.get_deadline_monitor(deadline_monitor_tag); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_start_succeeds() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + let _deadline_monitor = health_monitor.get_deadline_monitor(deadline_monitor_tag).unwrap(); + + let result = health_monitor.start(); + assert!(result.is_ok()); + } + + #[test] + fn health_monitor_start_monitors_not_taken() { + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.start(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); + } + + #[test] + fn health_monitor_start_no_monitors() { + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let deadline_monitor_builder = DeadlineMonitorBuilder::new(); + + let mut health_monitor = HealthMonitorBuilder::new() + .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) + .build() + .unwrap(); + + // Inject broken state - unreachable otherwise. + health_monitor.deadline_monitors = HashMap::new(); + + let result = health_monitor.start(); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); } } diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs new file mode 100644 index 00000000..195d167a --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs @@ -0,0 +1,28 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Module providing [`SupervisorAPIClient`] implementations. +//! Currently `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. +//! The latter is meant for testing purposes. + +/// An abstraction over the API used to notify the supervisor about process liveness. +pub trait SupervisorAPIClient { + fn notify_alive(&self); +} + +// NOTE: various implementations are not mutually exclusive. + +#[cfg(feature = "score_supervisor_api_client")] +pub mod score_supervisor_api_client; +#[cfg(feature = "stub_supervisor_api_client")] +pub mod stub_supervisor_api_client; diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs new file mode 100644 index 00000000..a198f9ad --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs @@ -0,0 +1,40 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +#![allow(dead_code)] + +use crate::log::debug; +use crate::supervisor_api_client::SupervisorAPIClient; +use crate::worker::Checks; + +pub struct ScoreSupervisorAPIClient { + supervisor_link: monitor_rs::Monitor, +} + +unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution + +impl ScoreSupervisorAPIClient { + pub fn new() -> Self { + let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); + debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); + // This is only temporary usage so unwrap is fine here. + let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); + Self { supervisor_link } + } +} + +impl SupervisorAPIClient for ScoreSupervisorAPIClient { + fn notify_alive(&self) { + self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); + } +} diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs new file mode 100644 index 00000000..e98f4909 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs @@ -0,0 +1,32 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +#![allow(dead_code)] + +use crate::log::warn; +use crate::supervisor_api_client::SupervisorAPIClient; + +/// A stub implementation of the SupervisorAPIClient that logs alive notifications. +pub struct StubSupervisorAPIClient; + +impl StubSupervisorAPIClient { + pub fn new() -> Self { + Self + } +} + +impl SupervisorAPIClient for StubSupervisorAPIClient { + fn notify_alive(&self) { + warn!("StubSupervisorAPIClient: notify_alive called"); + } +} diff --git a/src/health_monitoring_lib/rust/tag.rs b/src/health_monitoring_lib/rust/tag.rs index eca868df..204902b7 100644 --- a/src/health_monitoring_lib/rust/tag.rs +++ b/src/health_monitoring_lib/rust/tag.rs @@ -247,13 +247,41 @@ mod tests { } #[test] - fn tag_hash() { - let example_str = "EXAMPLE"; - let tag = Tag::from(example_str.to_string()); - let mut hasher = DefaultHasher::new(); - tag.hash(&mut hasher); - let hash = hasher.finish(); - assert_eq!(hash, 14738755424381306335); + fn tag_hash_is_eq() { + let tag1 = Tag::from("same"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("same"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_eq!(hash1, hash2); + } + + #[test] + fn tag_hash_is_ne() { + let tag1 = Tag::from("first"); + let hash1 = { + let mut hasher = DefaultHasher::new(); + tag1.hash(&mut hasher); + hasher.finish() + }; + + let tag2 = Tag::from("second"); + let hash2 = { + let mut hasher = DefaultHasher::new(); + tag2.hash(&mut hasher); + hasher.finish() + }; + + assert_ne!(hash1, hash2); } #[test] diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index 8830e153..916d589e 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -10,20 +10,20 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::common::{MonitorEvalHandle, MonitorEvaluator}; -use crate::log::{debug, info, warn}; +use crate::common::{MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; +use crate::log::{info, warn}; +use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; - -/// An abstraction over the API used to notify the supervisor about process liveness. -pub(super) trait SupervisorAPIClient { - fn notify_alive(&self); -} +use core::sync::atomic::{AtomicBool, Ordering}; +use core::time::Duration; +use std::sync::Arc; +use std::time::Instant; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, client: T, - last_notification: std::time::Instant, - supervisor_api_cycle: core::time::Duration, + last_notification: Instant, + supervisor_api_cycle: Duration, } impl MonitoringLogic { @@ -34,14 +34,14 @@ impl MonitoringLogic { /// * `client` - An implementation of the SupervisorAPIClient trait. pub(super) fn new( monitors: FixedCapacityVec, - supervisor_api_cycle: core::time::Duration, + supervisor_api_cycle: Duration, client: T, ) -> Self { Self { monitors, client, supervisor_api_cycle, - last_notification: std::time::Instant::now(), + last_notification: Instant::now(), } } @@ -51,14 +51,23 @@ impl MonitoringLogic { for monitor in self.monitors.iter() { monitor.evaluate(&mut |monitor_tag, error| { has_any_error = true; - // TODO: monitor type should be mentioned. - warn!("Monitor with tag {:?} reported error: {:?}.", monitor_tag, error); + + match error { + MonitorEvaluationError::Deadline(deadline_evaluation_error) => { + warn!( + "Deadline monitor with tag {:?} reported error: {:?}.", + monitor_tag, deadline_evaluation_error + ) + }, + MonitorEvaluationError::Heartbeat => unimplemented!(), + MonitorEvaluationError::Logic => unimplemented!(), + } }); } if !has_any_error { if self.last_notification.elapsed() > self.supervisor_api_cycle { - self.last_notification = std::time::Instant::now(); + self.last_notification = Instant::now(); self.client.notify_alive(); } } else { @@ -73,15 +82,15 @@ impl MonitoringLogic { /// A struct that manages a unique thread for running monitoring logic periodically. pub struct UniqueThreadRunner { handle: Option>, - should_stop: std::sync::Arc, - internal_duration_cycle: core::time::Duration, + should_stop: Arc, + internal_duration_cycle: Duration, } impl UniqueThreadRunner { - pub(super) fn new(internal_duration_cycle: core::time::Duration) -> Self { + pub(super) fn new(internal_duration_cycle: Duration) -> Self { Self { handle: None, - should_stop: std::sync::Arc::new(core::sync::atomic::AtomicBool::new(false)), + should_stop: Arc::new(AtomicBool::new(false)), internal_duration_cycle, } } @@ -99,10 +108,10 @@ impl UniqueThreadRunner { let mut next_sleep_time = interval; // TODO Add some checks and log if cyclicly here is not met. - while !should_stop.load(core::sync::atomic::Ordering::Relaxed) { + while !should_stop.load(Ordering::Relaxed) { std::thread::sleep(next_sleep_time); - let now = std::time::Instant::now(); + let now = Instant::now(); if !monitoring_logic.run() { info!("Monitoring logic failed, stopping thread."); @@ -118,7 +127,7 @@ impl UniqueThreadRunner { } pub fn join(&mut self) { - self.should_stop.store(true, core::sync::atomic::Ordering::Relaxed); + self.should_stop.store(true, Ordering::Relaxed); if let Some(handle) = self.handle.take() { let _ = handle.join(); } @@ -131,20 +140,9 @@ impl Drop for UniqueThreadRunner { } } -/// A stub implementation of the SupervisorAPIClient that logs alive notifications. -#[allow(dead_code)] -pub(super) struct StubSupervisorAPIClient; - -#[allow(dead_code)] -impl SupervisorAPIClient for StubSupervisorAPIClient { - fn notify_alive(&self) { - warn!("StubSupervisorAPIClient: notify_alive called"); - } -} - #[allow(dead_code)] #[derive(Copy, Clone)] -enum Checks { +pub(crate) enum Checks { WorkerCheckpoint, } @@ -156,59 +154,41 @@ impl From for u32 { } } -#[allow(dead_code)] -pub(super) struct ScoreSupervisorAPIClient { - supervisor_link: monitor_rs::Monitor, -} - -unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution - -#[allow(dead_code)] -impl ScoreSupervisorAPIClient { - pub fn new() -> Self { - let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); - debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); - // This is only temporary usage so unwrap is fine here. - let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); - Self { supervisor_link } - } -} -impl SupervisorAPIClient for ScoreSupervisorAPIClient { - fn notify_alive(&self) { - self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); - } -} - #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { + use crate::common::HasEvalHandle; use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; use crate::protected_memory::ProtectedMemoryAllocator; + use crate::supervisor_api_client::SupervisorAPIClient; use crate::tag::{DeadlineTag, MonitorTag}; + use crate::worker::{MonitoringLogic, UniqueThreadRunner}; use crate::TimeRange; - - use super::*; + use containers::fixed_capacity::FixedCapacityVec; + use core::sync::atomic::{AtomicUsize, Ordering}; + use core::time::Duration; + use std::sync::Arc; #[derive(Clone)] struct MockSupervisorAPIClient { - pub notify_called: std::sync::Arc, + pub notify_called: Arc, } impl MockSupervisorAPIClient { pub fn new() -> Self { Self { - notify_called: std::sync::Arc::new(core::sync::atomic::AtomicUsize::new(0)), + notify_called: Arc::new(AtomicUsize::new(0)), } } fn get_notify_count(&self) -> usize { - self.notify_called.load(core::sync::atomic::Ordering::Acquire) + self.notify_called.load(Ordering::Acquire) } } impl SupervisorAPIClient for MockSupervisorAPIClient { fn notify_alive(&self) { - self.notify_called.fetch_add(1, core::sync::atomic::Ordering::AcqRel); + self.notify_called.fetch_add(1, Ordering::AcqRel); } } @@ -218,14 +198,11 @@ mod tests { DeadlineMonitorBuilder::new() .add_deadline( DeadlineTag::from("deadline_long"), - TimeRange::new(core::time::Duration::from_secs(1), core::time::Duration::from_secs(50)), + TimeRange::new(Duration::from_secs(1), Duration::from_secs(50)), ) .add_deadline( DeadlineTag::from("deadline_fast"), - TimeRange::new( - core::time::Duration::from_millis(0), - core::time::Duration::from_millis(50), - ), + TimeRange::new(Duration::from_millis(0), Duration::from_millis(50)), ) .build(monitor_tag, &allocator) } @@ -241,7 +218,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_secs(1), + Duration::from_secs(1), alive_mock.clone(), ); @@ -267,7 +244,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); @@ -296,7 +273,7 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_millis(30), + Duration::from_millis(30), alive_mock.clone(), ); @@ -305,19 +282,19 @@ mod tests { .unwrap(); let _handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); - std::thread::sleep(core::time::Duration::from_millis(30)); + std::thread::sleep(Duration::from_millis(30)); assert!(logic.run()); assert_eq!(alive_mock.get_notify_count(), 5); @@ -335,11 +312,11 @@ mod tests { vec.push(deadline_monitor.get_eval_handle()).unwrap(); vec }, - core::time::Duration::from_nanos(0), // Make sure each call notifies alive + Duration::from_nanos(0), // Make sure each call notifies alive alive_mock.clone(), ); - let mut worker = UniqueThreadRunner::new(core::time::Duration::from_millis(10)); + let mut worker = UniqueThreadRunner::new(Duration::from_millis(10)); worker.start(logic); let mut deadline = deadline_monitor @@ -348,7 +325,7 @@ mod tests { let handle = deadline.start().unwrap(); - std::thread::sleep(core::time::Duration::from_millis(70)); + std::thread::sleep(Duration::from_millis(70)); let current_count = alive_mock.get_notify_count(); assert!( @@ -358,7 +335,7 @@ mod tests { ); // We shall not get any new alive calls. - std::thread::sleep(core::time::Duration::from_millis(50)); + std::thread::sleep(Duration::from_millis(50)); assert_eq!(alive_mock.get_notify_count(), current_count); handle.stop(); }