From 1328e65c32267c6fe0e9167a85459d8b34acda2b Mon Sep 17 00:00:00 2001 From: Matthew Frey Date: Mon, 23 Feb 2026 18:36:34 -0700 Subject: [PATCH] fix: make cron scheduler reliable under load and in containers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cron job system had several compounding issues that caused jobs to silently miss their scheduled firings: 1. Timer loop blocked on job execution — run_cron_job was awaited inline, so a slow or stuck job prevented subsequent ticks from firing. With MissedTickBehavior::Skip, every blocked tick was permanently lost. Fix: spawn each execution as an independent tokio task with an AtomicBool lock to prevent overlapping runs of the same job. 2. Timeout was not true wall-clock — the timeout wrapped each individual recv() call rather than the total collection phase, so periodic non-terminal output (status updates, stream chunks) could extend runtime indefinitely. Fix: compute a single deadline up front and use remaining budget for each recv. 3. Active-hours timezone silent fallback — when cron_timezone is unset, active_hours silently uses chrono::Local, which is often UTC in Docker/containerized environments. Jobs with active-hours constraints would be skipped with no obvious indicator. Fix: log a warning at scheduler startup when falling back to system local time. 4. API validation gap — the CronTool (LLM-facing) validated ID format, minimum interval, prompt length, delivery target format, and active hour ranges, but the HTTP API create_or_update_cron had none of these checks, allowing malformed entries to enter the scheduler. Fix: add equivalent validation to the API path. Co-authored-by: Cursor --- src/api/cron.rs | 92 +++++++++++++++++++-- src/cron/scheduler.rs | 181 ++++++++++++++++++++++++++++-------------- src/cron/store.rs | 4 +- 3 files changed, 208 insertions(+), 69 deletions(-) diff --git a/src/api/cron.rs b/src/api/cron.rs index 8a10890ec..a4e52926e 100644 --- a/src/api/cron.rs +++ b/src/api/cron.rs @@ -177,18 +177,96 @@ pub(super) async fn cron_executions( Ok(Json(CronExecutionsResponse { executions })) } +const MIN_CRON_INTERVAL_SECS: u64 = 60; +const MAX_CRON_PROMPT_LENGTH: usize = 10_000; + +fn validate_cron_request(request: &CreateCronRequest) -> Result<(), (StatusCode, String)> { + if request.id.is_empty() + || request.id.len() > 50 + || !request + .id + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err(( + StatusCode::BAD_REQUEST, + "id must be 1-50 alphanumeric/hyphen/underscore characters".into(), + )); + } + + if request.interval_secs < MIN_CRON_INTERVAL_SECS { + return Err(( + StatusCode::BAD_REQUEST, + format!( + "interval_secs must be at least {MIN_CRON_INTERVAL_SECS} (got {})", + request.interval_secs + ), + )); + } + + if request.prompt.len() > MAX_CRON_PROMPT_LENGTH { + return Err(( + StatusCode::BAD_REQUEST, + format!( + "prompt exceeds maximum length of {MAX_CRON_PROMPT_LENGTH} characters (got {})", + request.prompt.len() + ), + )); + } + + if !request.delivery_target.contains(':') { + return Err(( + StatusCode::BAD_REQUEST, + "delivery_target must be in 'adapter:target' format".into(), + )); + } + + if let Some(start) = request.active_start_hour { + if start > 23 { + return Err(( + StatusCode::BAD_REQUEST, + "active_start_hour must be 0-23".into(), + )); + } + } + if let Some(end) = request.active_end_hour { + if end > 23 { + return Err(( + StatusCode::BAD_REQUEST, + "active_end_hour must be 0-23".into(), + )); + } + } + + Ok(()) +} + /// Create or update a cron job. pub(super) async fn create_or_update_cron( State(state): State>, Json(request): Json, -) -> Result, StatusCode> { +) -> Result, (StatusCode, Json)> { + if let Err((status, message)) = validate_cron_request(&request) { + tracing::warn!(agent_id = %request.agent_id, cron_id = %request.id, %message, "cron validation failed"); + return Err((status, Json(CronActionResponse { + success: false, + message, + }))); + } + let stores = state.cron_stores.load(); let schedulers = state.cron_schedulers.load(); - let store = stores.get(&request.agent_id).ok_or(StatusCode::NOT_FOUND)?; - let scheduler = schedulers - .get(&request.agent_id) - .ok_or(StatusCode::NOT_FOUND)?; + let cron_err = |status: StatusCode, message: String| { + (status, Json(CronActionResponse { success: false, message })) + }; + + let store = stores.get(&request.agent_id).ok_or_else(|| { + cron_err(StatusCode::NOT_FOUND, format!("agent '{}' not found", request.agent_id)) + })?; + let scheduler = schedulers.get(&request.agent_id).ok_or_else(|| { + cron_err(StatusCode::NOT_FOUND, format!("agent '{}' not found", request.agent_id)) + })?; let active_hours = match (request.active_start_hour, request.active_end_hour) { (Some(start), Some(end)) => Some((start, end)), @@ -208,12 +286,12 @@ pub(super) async fn create_or_update_cron( store.save(&config).await.map_err(|error| { tracing::warn!(%error, agent_id = %request.agent_id, cron_id = %request.id, "failed to save cron job"); - StatusCode::INTERNAL_SERVER_ERROR + cron_err(StatusCode::INTERNAL_SERVER_ERROR, format!("failed to save: {error}")) })?; scheduler.register(config).await.map_err(|error| { tracing::warn!(%error, agent_id = %request.agent_id, cron_id = %request.id, "failed to register cron job"); - StatusCode::INTERNAL_SERVER_ERROR + cron_err(StatusCode::INTERNAL_SERVER_ERROR, format!("failed to register: {error}")) })?; Ok(Json(CronActionResponse { diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index bf8b841d8..b28c6a3fe 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -76,6 +76,16 @@ pub struct CronContext { } const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// RAII guard that clears an `AtomicBool` on drop, ensuring the flag is +/// released even if the holding task panics. +struct ExecutionGuard(Arc); + +impl Drop for ExecutionGuard { + fn drop(&mut self) { + self.0.store(false, std::sync::atomic::Ordering::Release); + } +} const SYSTEM_TIMEZONE_LABEL: &str = "system"; /// Scheduler that manages cron job timers and execution. @@ -93,6 +103,17 @@ impl std::fmt::Debug for Scheduler { impl Scheduler { pub fn new(context: CronContext) -> Self { + let tz_label = cron_timezone_label(&context); + if tz_label == SYSTEM_TIMEZONE_LABEL { + tracing::warn!( + agent_id = %context.deps.agent_id, + "no cron_timezone configured; active_hours will use the host system's local time, \ + which is often UTC in Docker/containerized environments — set [defaults] \ + cron_timezone to an IANA timezone like \"America/New_York\" if jobs are \ + skipping their active window" + ); + } + Self { jobs: Arc::new(RwLock::new(HashMap::new())), timers: Arc::new(RwLock::new(HashMap::new())), @@ -118,7 +139,7 @@ impl Scheduler { prompt: config.prompt, interval_secs: config.interval_secs, delivery_target, - active_hours: config.active_hours, + active_hours: normalize_active_hours(config.active_hours), enabled: config.enabled, run_once: config.run_once, consecutive_failures: 0, @@ -197,6 +218,8 @@ impl Scheduler { // Skip catch-up ticks if processing falls behind — maintain original cadence. ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let execution_lock = Arc::new(std::sync::atomic::AtomicBool::new(false)); + loop { ticker.tick().await; @@ -232,72 +255,81 @@ impl Scheduler { } } - tracing::info!(cron_id = %job_id, "cron job firing"); - - match run_cron_job(&job, &context).await { - Ok(()) => { - // Reset failure count on success - let mut j = jobs.write().await; - if let Some(j) = j.get_mut(&job_id) { - j.consecutive_failures = 0; - } - } - Err(error) => { - tracing::error!( - cron_id = %job_id, - %error, - "cron job execution failed" - ); + if execution_lock.load(std::sync::atomic::Ordering::Acquire) { + tracing::debug!(cron_id = %job_id, "previous execution still running, skipping tick"); + continue; + } - let should_disable = { - let mut j = jobs.write().await; - if let Some(j) = j.get_mut(&job_id) { - j.consecutive_failures += 1; - j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES - } else { - false + tracing::info!(cron_id = %job_id, "cron job firing"); + execution_lock.store(true, std::sync::atomic::Ordering::Release); + + let exec_jobs = jobs.clone(); + let exec_context = context.clone(); + let exec_job_id = job_id.clone(); + let guard = ExecutionGuard(execution_lock.clone()); + + tokio::spawn(async move { + let _guard = guard; + match run_cron_job(&job, &exec_context).await { + Ok(()) => { + let mut j = exec_jobs.write().await; + if let Some(j) = j.get_mut(&exec_job_id) { + j.consecutive_failures = 0; } - }; - - if should_disable { - tracing::warn!( - cron_id = %job_id, - "circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling" + } + Err(error) => { + tracing::error!( + cron_id = %exec_job_id, + %error, + "cron job execution failed" ); - { - let mut j = jobs.write().await; - if let Some(j) = j.get_mut(&job_id) { - j.enabled = false; + let should_disable = { + let mut j = exec_jobs.write().await; + if let Some(j) = j.get_mut(&exec_job_id) { + j.consecutive_failures += 1; + j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES + } else { + false + } + }; + + if should_disable { + tracing::warn!( + cron_id = %exec_job_id, + "circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling" + ); + + { + let mut j = exec_jobs.write().await; + if let Some(j) = j.get_mut(&exec_job_id) { + j.enabled = false; + } } - } - // Persist the disabled state - if let Err(error) = context.store.update_enabled(&job_id, false).await { - tracing::error!(%error, "failed to persist cron job disabled state"); + if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await { + tracing::error!(%error, "failed to persist cron job disabled state"); + } } - - break; } } - } - if job.run_once { - tracing::info!(cron_id = %job_id, "run-once cron completed, disabling"); + if job.run_once { + tracing::info!(cron_id = %exec_job_id, "run-once cron completed, disabling"); - { - let mut j = jobs.write().await; - if let Some(j) = j.get_mut(&job_id) { - j.enabled = false; + { + let mut j = exec_jobs.write().await; + if let Some(j) = j.get_mut(&exec_job_id) { + j.enabled = false; + } } - } - if let Err(error) = context.store.update_enabled(&job_id, false).await { - tracing::error!(%error, "failed to persist run-once cron disabled state"); + if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await { + tracing::error!(%error, "failed to persist run-once cron disabled state"); + } } - break; - } + }); } }); @@ -424,7 +456,7 @@ impl Scheduler { prompt: config.prompt, interval_secs: config.interval_secs, delivery_target, - active_hours: config.active_hours, + active_hours: normalize_active_hours(config.active_hours), enabled: true, run_once: config.run_once, consecutive_failures: 0, @@ -512,13 +544,21 @@ fn current_hour_and_timezone(context: &CronContext, cron_id: &str) -> (u8, Strin } fn hour_in_active_window(current_hour: u8, start_hour: u8, end_hour: u8) -> bool { - if start_hour <= end_hour { + if start_hour == end_hour { + return true; + } + if start_hour < end_hour { current_hour >= start_hour && current_hour < end_hour } else { current_hour >= start_hour || current_hour < end_hour } } +/// Normalize degenerate active_hours where start == end to None (always active). +fn normalize_active_hours(active_hours: Option<(u8, u8)>) -> Option<(u8, u8)> { + active_hours.filter(|(start, end)| start != end) +} + fn ensure_cron_dispatch_readiness(context: &CronContext, cron_id: &str) { let readiness = context.deps.runtime_config.work_readiness(); if readiness.ready { @@ -613,19 +653,23 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { // when the sender is dropped (message_rx returns None). drop(channel_tx); + let deadline = tokio::time::Instant::now() + timeout; loop { - match tokio::time::timeout(timeout, response_rx.recv()).await { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + tracing::warn!(cron_id = %job.id, "cron job timed out after {timeout:?}"); + channel_handle.abort(); + break; + } + match tokio::time::timeout(remaining, response_rx.recv()).await { Ok(Some(OutboundResponse::Text(text))) => { collected_text.push(text); } Ok(Some(OutboundResponse::RichMessage { text, .. })) => { collected_text.push(text); } - Ok(Some(_)) => { - // Status updates, stream chunks, etc. — ignore for cron jobs - } + Ok(Some(_)) => {} Ok(None) => { - // Channel finished (response_tx dropped) break; } Err(_) => { @@ -692,7 +736,7 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { #[cfg(test)] mod tests { - use super::hour_in_active_window; + use super::{hour_in_active_window, normalize_active_hours}; #[test] fn test_hour_in_active_window_non_wrapping() { @@ -708,4 +752,21 @@ mod tests { assert!(hour_in_active_window(3, 22, 6)); assert!(!hour_in_active_window(12, 22, 6)); } + + #[test] + fn test_hour_in_active_window_equal_start_end_is_always_active() { + assert!(hour_in_active_window(0, 0, 0)); + assert!(hour_in_active_window(12, 0, 0)); + assert!(hour_in_active_window(23, 0, 0)); + assert!(hour_in_active_window(5, 5, 5)); + assert!(hour_in_active_window(14, 14, 14)); + } + + #[test] + fn test_normalize_active_hours() { + assert_eq!(normalize_active_hours(Some((0, 0))), None); + assert_eq!(normalize_active_hours(Some((12, 12))), None); + assert_eq!(normalize_active_hours(Some((9, 17))), Some((9, 17))); + assert_eq!(normalize_active_hours(None), None); + } } diff --git a/src/cron/store.rs b/src/cron/store.rs index 75e76c109..2b9a3c38c 100644 --- a/src/cron/store.rs +++ b/src/cron/store.rs @@ -78,7 +78,7 @@ impl CronStore { let start: Option = row.try_get("active_start_hour").ok(); let end: Option = row.try_get("active_end_hour").ok(); match (start, end) { - (Some(s), Some(e)) => Some((s as u8, e as u8)), + (Some(s), Some(e)) if s != e => Some((s as u8, e as u8)), _ => None, } }, @@ -168,7 +168,7 @@ impl CronStore { let start: Option = row.try_get("active_start_hour").ok(); let end: Option = row.try_get("active_end_hour").ok(); match (start, end) { - (Some(s), Some(e)) => Some((s as u8, e as u8)), + (Some(s), Some(e)) if s != e => Some((s as u8, e as u8)), _ => None, } },