Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 85 additions & 7 deletions src/api/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,96 @@ pub(super) async fn cron_executions(
Ok(Json(CronExecutionsResponse { executions }))
}

const MIN_CRON_INTERVAL_SECS: u64 = 60;
const MAX_CRON_PROMPT_LENGTH: usize = 10_000;

fn validate_cron_request(request: &CreateCronRequest) -> Result<(), (StatusCode, String)> {
if request.id.is_empty()
|| request.id.len() > 50
|| !request
.id
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
{
return Err((
StatusCode::BAD_REQUEST,
"id must be 1-50 alphanumeric/hyphen/underscore characters".into(),
));
}

if request.interval_secs < MIN_CRON_INTERVAL_SECS {
return Err((
StatusCode::BAD_REQUEST,
format!(
"interval_secs must be at least {MIN_CRON_INTERVAL_SECS} (got {})",
request.interval_secs
),
));
}

if request.prompt.len() > MAX_CRON_PROMPT_LENGTH {
return Err((
StatusCode::BAD_REQUEST,
format!(
"prompt exceeds maximum length of {MAX_CRON_PROMPT_LENGTH} characters (got {})",
request.prompt.len()
),
));
}

if !request.delivery_target.contains(':') {
return Err((
StatusCode::BAD_REQUEST,
"delivery_target must be in 'adapter:target' format".into(),
));
}

if let Some(start) = request.active_start_hour {
if start > 23 {
return Err((
StatusCode::BAD_REQUEST,
"active_start_hour must be 0-23".into(),
));
}
}
if let Some(end) = request.active_end_hour {
if end > 23 {
return Err((
StatusCode::BAD_REQUEST,
"active_end_hour must be 0-23".into(),
));
}
}
Comment on lines +224 to +239
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Supplying only one of active_start_hour / active_end_hour is silently ignored.

Validation checks each hour independently (Lines 224–239), but create_or_update_cron maps the pair to None when only one is provided (Lines 271–274). A user who accidentally omits one field gets no feedback that their active-hours window was discarded. Consider validating that both or neither are provided.

Suggested addition inside `validate_cron_request`
+    if request.active_start_hour.is_some() != request.active_end_hour.is_some() {
+        return Err((
+            StatusCode::BAD_REQUEST,
+            "active_start_hour and active_end_hour must both be provided or both omitted".into(),
+        ));
+    }
+
     Ok(())
 }

Also applies to: 271-274

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/cron.rs` around lines 224 - 239, The validation currently accepts
each active_start_hour and active_end_hour independently but
create_or_update_cron discards the pair if only one is supplied; update
validate_cron_request to detect the case where exactly one of
request.active_start_hour or request.active_end_hour is Some and return an Err
with StatusCode::BAD_REQUEST (e.g. "both active_start_hour and active_end_hour
must be provided together or omitted") so callers get immediate feedback; keep
the existing per-value range checks for 0-23 and reference the
validate_cron_request and create_or_update_cron functions to locate where to add
the joint-presence check.


Ok(())
}

/// Create or update a cron job.
pub(super) async fn create_or_update_cron(
State(state): State<Arc<ApiState>>,
Json(request): Json<CreateCronRequest>,
) -> Result<Json<CronActionResponse>, StatusCode> {
) -> Result<Json<CronActionResponse>, (StatusCode, Json<CronActionResponse>)> {
if let Err((status, message)) = validate_cron_request(&request) {
tracing::warn!(agent_id = %request.agent_id, cron_id = %request.id, %message, "cron validation failed");
return Err((status, Json(CronActionResponse {
success: false,
message,
})));
}

let stores = state.cron_stores.load();
let schedulers = state.cron_schedulers.load();

let store = stores.get(&request.agent_id).ok_or(StatusCode::NOT_FOUND)?;
let scheduler = schedulers
.get(&request.agent_id)
.ok_or(StatusCode::NOT_FOUND)?;
let cron_err = |status: StatusCode, message: String| {
(status, Json(CronActionResponse { success: false, message }))
};

let store = stores.get(&request.agent_id).ok_or_else(|| {
cron_err(StatusCode::NOT_FOUND, format!("agent '{}' not found", request.agent_id))
})?;
let scheduler = schedulers.get(&request.agent_id).ok_or_else(|| {
cron_err(StatusCode::NOT_FOUND, format!("agent '{}' not found", request.agent_id))
})?;

let active_hours = match (request.active_start_hour, request.active_end_hour) {
(Some(start), Some(end)) => Some((start, end)),
Expand All @@ -208,12 +286,12 @@ pub(super) async fn create_or_update_cron(

store.save(&config).await.map_err(|error| {
tracing::warn!(%error, agent_id = %request.agent_id, cron_id = %request.id, "failed to save cron job");
StatusCode::INTERNAL_SERVER_ERROR
cron_err(StatusCode::INTERNAL_SERVER_ERROR, format!("failed to save: {error}"))
})?;

scheduler.register(config).await.map_err(|error| {
tracing::warn!(%error, agent_id = %request.agent_id, cron_id = %request.id, "failed to register cron job");
StatusCode::INTERNAL_SERVER_ERROR
cron_err(StatusCode::INTERNAL_SERVER_ERROR, format!("failed to register: {error}"))
})?;

Ok(Json(CronActionResponse {
Expand Down
181 changes: 121 additions & 60 deletions src/cron/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub struct CronContext {
}

const MAX_CONSECUTIVE_FAILURES: u32 = 3;

/// RAII guard that clears an `AtomicBool` on drop, ensuring the flag is
/// released even if the holding task panics.
struct ExecutionGuard(Arc<std::sync::atomic::AtomicBool>);

impl Drop for ExecutionGuard {
fn drop(&mut self) {
self.0.store(false, std::sync::atomic::Ordering::Release);
}
}
const SYSTEM_TIMEZONE_LABEL: &str = "system";

/// Scheduler that manages cron job timers and execution.
Expand All @@ -93,6 +103,17 @@ impl std::fmt::Debug for Scheduler {

impl Scheduler {
pub fn new(context: CronContext) -> Self {
let tz_label = cron_timezone_label(&context);
if tz_label == SYSTEM_TIMEZONE_LABEL {
tracing::warn!(
agent_id = %context.deps.agent_id,
"no cron_timezone configured; active_hours will use the host system's local time, \
which is often UTC in Docker/containerized environments — set [defaults] \
cron_timezone to an IANA timezone like \"America/New_York\" if jobs are \
skipping their active window"
);
}

Self {
jobs: Arc::new(RwLock::new(HashMap::new())),
timers: Arc::new(RwLock::new(HashMap::new())),
Expand All @@ -118,7 +139,7 @@ impl Scheduler {
prompt: config.prompt,
interval_secs: config.interval_secs,
delivery_target,
active_hours: config.active_hours,
active_hours: normalize_active_hours(config.active_hours),
enabled: config.enabled,
run_once: config.run_once,
consecutive_failures: 0,
Expand Down Expand Up @@ -197,6 +218,8 @@ impl Scheduler {
// Skip catch-up ticks if processing falls behind — maintain original cadence.
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let execution_lock = Arc::new(std::sync::atomic::AtomicBool::new(false));

loop {
ticker.tick().await;

Expand Down Expand Up @@ -232,72 +255,81 @@ impl Scheduler {
}
}

tracing::info!(cron_id = %job_id, "cron job firing");

match run_cron_job(&job, &context).await {
Ok(()) => {
// Reset failure count on success
let mut j = jobs.write().await;
if let Some(j) = j.get_mut(&job_id) {
j.consecutive_failures = 0;
}
}
Err(error) => {
tracing::error!(
cron_id = %job_id,
%error,
"cron job execution failed"
);
if execution_lock.load(std::sync::atomic::Ordering::Acquire) {
tracing::debug!(cron_id = %job_id, "previous execution still running, skipping tick");
continue;
}

let should_disable = {
let mut j = jobs.write().await;
if let Some(j) = j.get_mut(&job_id) {
j.consecutive_failures += 1;
j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES
} else {
false
tracing::info!(cron_id = %job_id, "cron job firing");
execution_lock.store(true, std::sync::atomic::Ordering::Release);
Comment on lines +258 to +264
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

TOCTOU gap between the lock check and the lock set.

The load on Line 258 and store on Line 264 are not performed atomically. While the timer loop is single-threaded (so only one iteration runs at a time), trigger_now (Line 401) calls run_cron_job directly without checking or setting execution_lock. This means a manual trigger can race with a scheduled tick, resulting in two concurrent executions of the same job.

If overlapping a manual trigger with a scheduled run is acceptable, this is fine as-is. If not, consider using compare_exchange here and having trigger_now also respect the lock.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cron/scheduler.rs` around lines 258 - 264, The current load/store on
execution_lock (used around the scheduled tick in the timer loop) has a TOCTOU
gap allowing trigger_now to race with the scheduled path; change the scheduled
path to attempt to set the lock atomically using
execution_lock.compare_exchange(..., Ordering::Acquire, Ordering::Relaxed or
::Acquire/::Release as appropriate) and only call run_cron_job when
compare_exchange succeeds, and update trigger_now to perform the same atomic
compare_exchange check before calling run_cron_job so both manual and scheduled
triggers respect the same lock; ensure the lock is released (store false with
Ordering::Release) when run_cron_job completes or on error.


let exec_jobs = jobs.clone();
let exec_context = context.clone();
let exec_job_id = job_id.clone();
let guard = ExecutionGuard(execution_lock.clone());

tokio::spawn(async move {
let _guard = guard;
match run_cron_job(&job, &exec_context).await {
Ok(()) => {
let mut j = exec_jobs.write().await;
if let Some(j) = j.get_mut(&exec_job_id) {
j.consecutive_failures = 0;
}
};

if should_disable {
tracing::warn!(
cron_id = %job_id,
"circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling"
}
Err(error) => {
tracing::error!(
cron_id = %exec_job_id,
%error,
"cron job execution failed"
);

{
let mut j = jobs.write().await;
if let Some(j) = j.get_mut(&job_id) {
j.enabled = false;
let should_disable = {
let mut j = exec_jobs.write().await;
if let Some(j) = j.get_mut(&exec_job_id) {
j.consecutive_failures += 1;
j.consecutive_failures >= MAX_CONSECUTIVE_FAILURES
} else {
false
}
};

if should_disable {
tracing::warn!(
cron_id = %exec_job_id,
"circuit breaker tripped after {MAX_CONSECUTIVE_FAILURES} consecutive failures, disabling"
);

{
let mut j = exec_jobs.write().await;
if let Some(j) = j.get_mut(&exec_job_id) {
j.enabled = false;
}
}
}

// Persist the disabled state
if let Err(error) = context.store.update_enabled(&job_id, false).await {
tracing::error!(%error, "failed to persist cron job disabled state");
if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await {
tracing::error!(%error, "failed to persist cron job disabled state");
}
}

break;
}
}
}

if job.run_once {
tracing::info!(cron_id = %job_id, "run-once cron completed, disabling");
if job.run_once {
tracing::info!(cron_id = %exec_job_id, "run-once cron completed, disabling");

{
let mut j = jobs.write().await;
if let Some(j) = j.get_mut(&job_id) {
j.enabled = false;
{
let mut j = exec_jobs.write().await;
if let Some(j) = j.get_mut(&exec_job_id) {
j.enabled = false;
}
}
}

if let Err(error) = context.store.update_enabled(&job_id, false).await {
tracing::error!(%error, "failed to persist run-once cron disabled state");
if let Err(error) = exec_context.store.update_enabled(&exec_job_id, false).await {
tracing::error!(%error, "failed to persist run-once cron disabled state");
}
}
Comment on lines +317 to 330
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

run_once disables the job even when execution failed.

The if job.run_once block runs unconditionally after both the Ok and Err arms. A failed run-once job will be permanently disabled without ever completing successfully. If this is intentional ("run once regardless of outcome"), a brief comment would clarify the design choice. If a failed run-once job should be retried on the next tick, this block should only run on success.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cron/scheduler.rs` around lines 317 - 330, The run-once disable logic
currently executed unconditionally after job execution (checking job.run_once,
mutating exec_jobs and calling exec_context.store.update_enabled) causes failed
run-once jobs to be permanently disabled; change the flow so the block that sets
j.enabled = false and calls exec_context.store.update_enabled(&exec_job_id,
false).await only runs when the execution succeeded (i.e., inside the Ok arm of
the execution result) or, if the intention is to disable regardless, add a clear
comment next to job.run_once documenting that behavior and why failures should
not be retried; reference job.run_once, exec_jobs, exec_job_id, and
exec_context.store.update_enabled when making the change.


break;
}
});
}
});

Expand Down Expand Up @@ -424,7 +456,7 @@ impl Scheduler {
prompt: config.prompt,
interval_secs: config.interval_secs,
delivery_target,
active_hours: config.active_hours,
active_hours: normalize_active_hours(config.active_hours),
enabled: true,
run_once: config.run_once,
consecutive_failures: 0,
Expand Down Expand Up @@ -512,13 +544,21 @@ fn current_hour_and_timezone(context: &CronContext, cron_id: &str) -> (u8, Strin
}

fn hour_in_active_window(current_hour: u8, start_hour: u8, end_hour: u8) -> bool {
if start_hour <= end_hour {
if start_hour == end_hour {
return true;
}
if start_hour < end_hour {
current_hour >= start_hour && current_hour < end_hour
} else {
current_hour >= start_hour || current_hour < end_hour
}
}

/// Normalize degenerate active_hours where start == end to None (always active).
fn normalize_active_hours(active_hours: Option<(u8, u8)>) -> Option<(u8, u8)> {
active_hours.filter(|(start, end)| start != end)
}

fn ensure_cron_dispatch_readiness(context: &CronContext, cron_id: &str) {
let readiness = context.deps.runtime_config.work_readiness();
if readiness.ready {
Expand Down Expand Up @@ -613,19 +653,23 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> {
// when the sender is dropped (message_rx returns None).
drop(channel_tx);

let deadline = tokio::time::Instant::now() + timeout;
loop {
match tokio::time::timeout(timeout, response_rx.recv()).await {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
tracing::warn!(cron_id = %job.id, "cron job timed out after {timeout:?}");
channel_handle.abort();
break;
}
match tokio::time::timeout(remaining, response_rx.recv()).await {
Ok(Some(OutboundResponse::Text(text))) => {
collected_text.push(text);
}
Ok(Some(OutboundResponse::RichMessage { text, .. })) => {
collected_text.push(text);
}
Ok(Some(_)) => {
// Status updates, stream chunks, etc. — ignore for cron jobs
}
Ok(Some(_)) => {}
Ok(None) => {
// Channel finished (response_tx dropped)
break;
}
Err(_) => {
Expand Down Expand Up @@ -692,7 +736,7 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> {

#[cfg(test)]
mod tests {
use super::hour_in_active_window;
use super::{hour_in_active_window, normalize_active_hours};

#[test]
fn test_hour_in_active_window_non_wrapping() {
Expand All @@ -708,4 +752,21 @@ mod tests {
assert!(hour_in_active_window(3, 22, 6));
assert!(!hour_in_active_window(12, 22, 6));
}

#[test]
fn test_hour_in_active_window_equal_start_end_is_always_active() {
assert!(hour_in_active_window(0, 0, 0));
assert!(hour_in_active_window(12, 0, 0));
assert!(hour_in_active_window(23, 0, 0));
assert!(hour_in_active_window(5, 5, 5));
assert!(hour_in_active_window(14, 14, 14));
}

#[test]
fn test_normalize_active_hours() {
assert_eq!(normalize_active_hours(Some((0, 0))), None);
assert_eq!(normalize_active_hours(Some((12, 12))), None);
assert_eq!(normalize_active_hours(Some((9, 17))), Some((9, 17)));
assert_eq!(normalize_active_hours(None), None);
}
}
Loading