-
Notifications
You must be signed in to change notification settings - Fork 184
fix(cron): make cron scheduler reliable under load and in containers #186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,6 +76,16 @@ pub struct CronContext { | |
| } | ||
|
|
||
| const MAX_CONSECUTIVE_FAILURES: u32 = 3; | ||
|
|
||
| /// RAII guard that clears an `AtomicBool` on drop, ensuring the flag is | ||
| /// released even if the holding task panics. | ||
| struct ExecutionGuard(Arc<std::sync::atomic::AtomicBool>); | ||
|
|
||
| impl Drop for ExecutionGuard { | ||
| fn drop(&mut self) { | ||
| self.0.store(false, std::sync::atomic::Ordering::Release); | ||
| } | ||
| } | ||
| const SYSTEM_TIMEZONE_LABEL: &str = "system"; | ||
|
|
||
| /// Scheduler that manages cron job timers and execution. | ||
|
|
@@ -93,6 +103,17 @@ impl std::fmt::Debug for Scheduler { | |
|
|
||
| impl Scheduler { | ||
| pub fn new(context: CronContext) -> Self { | ||
| let tz_label = cron_timezone_label(&context); | ||
| if tz_label == SYSTEM_TIMEZONE_LABEL { | ||
| tracing::warn!( | ||
| agent_id = %context.deps.agent_id, | ||
| "no cron_timezone configured; active_hours will use the host system's local time, \ | ||
| which is often UTC in Docker/containerized environments — set [defaults] \ | ||
| cron_timezone to an IANA timezone like \"America/New_York\" if jobs are \ | ||
| skipping their active window" | ||
| ); | ||
| } | ||
|
|
||
| Self { | ||
| jobs: Arc::new(RwLock::new(HashMap::new())), | ||
| timers: Arc::new(RwLock::new(HashMap::new())), | ||
|
|
@@ -118,7 +139,7 @@ impl Scheduler { | |
| prompt: config.prompt, | ||
| interval_secs: config.interval_secs, | ||
| delivery_target, | ||
| active_hours: config.active_hours, | ||
| active_hours: normalize_active_hours(config.active_hours), | ||
| enabled: config.enabled, | ||
| run_once: config.run_once, | ||
| consecutive_failures: 0, | ||
|
|
@@ -197,6 +218,8 @@ impl Scheduler { | |
| // Skip catch-up ticks if processing falls behind — maintain original cadence. | ||
| ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
|
||
| let execution_lock = Arc::new(std::sync::atomic::AtomicBool::new(false)); | ||
|
|
||
| loop { | ||
| ticker.tick().await; | ||
|
|
||
|
|
@@ -232,72 +255,81 @@ impl Scheduler { | |
| } | ||
| } | ||
|
|
||
| tracing::info!(cron_id = %job_id, "cron job firing"); | ||
|
|
||
| match run_cron_job(&job, &context).await { | ||
| Ok(()) => { | ||
| // Reset failure count on success | ||
| let mut j = jobs.write().await; | ||
| if let Some(j) = j.get_mut(&job_id) { | ||
| j.consecutive_failures = 0; | ||
| } | ||
| } | ||
| Err(error) => { | ||
| tracing::error!( | ||
| cron_id = %job_id, | ||
| %error, | ||
| "cron job execution failed" | ||
| ); | ||
| if execution_lock.load(std::sync::atomic::Ordering::Acquire) { | ||
| tracing::debug!(cron_id = %job_id, "previous execution still running, skipping tick"); | ||
| continue; | ||
| } | ||
|
|
||
| let should_disable = { | ||
| let mut j = jobs.write().await; | ||
| if let Some(j) = j.get_mut(&job_id) { | ||
| j.consecutive_failures += 1; | ||
| j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES | ||
| } else { | ||
| false | ||
| tracing::info!(cron_id = %job_id, "cron job firing"); | ||
| execution_lock.store(true, std::sync::atomic::Ordering::Release); | ||
|
Comment on lines
+258
to
+264
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TOCTOU gap between the lock check and the lock set. The If overlapping a manual trigger with a scheduled run is acceptable, this is fine as-is. If not, consider using 🤖 Prompt for AI Agents |
||
|
|
||
| let exec_jobs = jobs.clone(); | ||
| let exec_context = context.clone(); | ||
| let exec_job_id = job_id.clone(); | ||
| let guard = ExecutionGuard(execution_lock.clone()); | ||
|
|
||
| tokio::spawn(async move { | ||
| let _guard = guard; | ||
| match run_cron_job(&job, &exec_context).await { | ||
| Ok(()) => { | ||
| let mut j = exec_jobs.write().await; | ||
| if let Some(j) = j.get_mut(&exec_job_id) { | ||
| j.consecutive_failures = 0; | ||
| } | ||
| }; | ||
|
|
||
| if should_disable { | ||
| tracing::warn!( | ||
| cron_id = %job_id, | ||
| "circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling" | ||
| } | ||
| Err(error) => { | ||
| tracing::error!( | ||
| cron_id = %exec_job_id, | ||
| %error, | ||
| "cron job execution failed" | ||
| ); | ||
|
|
||
| { | ||
| let mut j = jobs.write().await; | ||
| if let Some(j) = j.get_mut(&job_id) { | ||
| j.enabled = false; | ||
| let should_disable = { | ||
| let mut j = exec_jobs.write().await; | ||
| if let Some(j) = j.get_mut(&exec_job_id) { | ||
| j.consecutive_failures += 1; | ||
| j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES | ||
| } else { | ||
| false | ||
| } | ||
| }; | ||
|
|
||
| if should_disable { | ||
| tracing::warn!( | ||
| cron_id = %exec_job_id, | ||
| "circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling" | ||
| ); | ||
|
|
||
| { | ||
| let mut j = exec_jobs.write().await; | ||
| if let Some(j) = j.get_mut(&exec_job_id) { | ||
| j.enabled = false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Persist the disabled state | ||
| if let Err(error) = context.store.update_enabled(&job_id, false).await { | ||
| tracing::error!(%error, "failed to persist cron job disabled state"); | ||
| if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await { | ||
| tracing::error!(%error, "failed to persist cron job disabled state"); | ||
| } | ||
| } | ||
|
|
||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if job.run_once { | ||
| tracing::info!(cron_id = %job_id, "run-once cron completed, disabling"); | ||
| if job.run_once { | ||
| tracing::info!(cron_id = %exec_job_id, "run-once cron completed, disabling"); | ||
|
|
||
| { | ||
| let mut j = jobs.write().await; | ||
| if let Some(j) = j.get_mut(&job_id) { | ||
| j.enabled = false; | ||
| { | ||
| let mut j = exec_jobs.write().await; | ||
| if let Some(j) = j.get_mut(&exec_job_id) { | ||
| j.enabled = false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if let Err(error) = context.store.update_enabled(&job_id, false).await { | ||
| tracing::error!(%error, "failed to persist run-once cron disabled state"); | ||
| if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await { | ||
| tracing::error!(%error, "failed to persist run-once cron disabled state"); | ||
| } | ||
| } | ||
|
Comment on lines
+317
to
330
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The 🤖 Prompt for AI Agents |
||
|
|
||
| break; | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -424,7 +456,7 @@ impl Scheduler { | |
| prompt: config.prompt, | ||
| interval_secs: config.interval_secs, | ||
| delivery_target, | ||
| active_hours: config.active_hours, | ||
| active_hours: normalize_active_hours(config.active_hours), | ||
| enabled: true, | ||
| run_once: config.run_once, | ||
| consecutive_failures: 0, | ||
|
|
@@ -512,13 +544,21 @@ fn current_hour_and_timezone(context: &CronContext, cron_id: &str) -> (u8, Strin | |
| } | ||
|
|
||
| fn hour_in_active_window(current_hour: u8, start_hour: u8, end_hour: u8) -> bool { | ||
| if start_hour <= end_hour { | ||
| if start_hour == end_hour { | ||
| return true; | ||
| } | ||
| if start_hour < end_hour { | ||
| current_hour >= start_hour && current_hour < end_hour | ||
| } else { | ||
| current_hour >= start_hour || current_hour < end_hour | ||
| } | ||
| } | ||
|
|
||
| /// Normalize degenerate active_hours where start == end to None (always active). | ||
| fn normalize_active_hours(active_hours: Option<(u8, u8)>) -> Option<(u8, u8)> { | ||
| active_hours.filter(|(start, end)| start != end) | ||
| } | ||
|
|
||
| fn ensure_cron_dispatch_readiness(context: &CronContext, cron_id: &str) { | ||
| let readiness = context.deps.runtime_config.work_readiness(); | ||
| if readiness.ready { | ||
|
|
@@ -613,19 +653,23 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { | |
| // when the sender is dropped (message_rx returns None). | ||
| drop(channel_tx); | ||
|
|
||
| let deadline = tokio::time::Instant::now() + timeout; | ||
| loop { | ||
| match tokio::time::timeout(timeout, response_rx.recv()).await { | ||
| let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); | ||
| if remaining.is_zero() { | ||
| tracing::warn!(cron_id = %job.id, "cron job timed out after {timeout:?}"); | ||
| channel_handle.abort(); | ||
| break; | ||
| } | ||
| match tokio::time::timeout(remaining, response_rx.recv()).await { | ||
| Ok(Some(OutboundResponse::Text(text))) => { | ||
| collected_text.push(text); | ||
| } | ||
| Ok(Some(OutboundResponse::RichMessage { text, .. })) => { | ||
| collected_text.push(text); | ||
| } | ||
| Ok(Some(_)) => { | ||
| // Status updates, stream chunks, etc. — ignore for cron jobs | ||
| } | ||
| Ok(Some(_)) => {} | ||
| Ok(None) => { | ||
| // Channel finished (response_tx dropped) | ||
| break; | ||
| } | ||
| Err(_) => { | ||
|
|
@@ -692,7 +736,7 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::hour_in_active_window; | ||
| use super::{hour_in_active_window, normalize_active_hours}; | ||
|
|
||
| #[test] | ||
| fn test_hour_in_active_window_non_wrapping() { | ||
|
|
@@ -708,4 +752,21 @@ mod tests { | |
| assert!(hour_in_active_window(3, 22, 6)); | ||
| assert!(!hour_in_active_window(12, 22, 6)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_hour_in_active_window_equal_start_end_is_always_active() { | ||
| assert!(hour_in_active_window(0, 0, 0)); | ||
| assert!(hour_in_active_window(12, 0, 0)); | ||
| assert!(hour_in_active_window(23, 0, 0)); | ||
| assert!(hour_in_active_window(5, 5, 5)); | ||
| assert!(hour_in_active_window(14, 14, 14)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_normalize_active_hours() { | ||
| assert_eq!(normalize_active_hours(Some((0, 0))), None); | ||
| assert_eq!(normalize_active_hours(Some((12, 12))), None); | ||
| assert_eq!(normalize_active_hours(Some((9, 17))), Some((9, 17))); | ||
| assert_eq!(normalize_active_hours(None), None); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supplying only one of
active_start_hour/active_end_houris silently ignored.Validation checks each hour independently (Lines 224–239), but
create_or_update_cronmaps the pair toNonewhen only one is provided (Lines 271–274). A user who accidentally omits one field gets no feedback that their active-hours window was discarded. Consider validating that both or neither are provided.Suggested addition inside `validate_cron_request`
Also applies to: 271-274
🤖 Prompt for AI Agents