From cdc9860c1fe355edf1889746c06fa2db88f02e5d Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:16:04 +0200 Subject: [PATCH 1/6] feat(jobs): introduce scalable background job runner Adds a production-ready job runner that executes deferred work outside of the request path. Key components: - DB migration: new `jobs` table with state, timestamps, visibility timeout - Trait-based job handlers with registry for dynamic dispatch - Async worker supervisor supporting configurable concurrency and graceful shutdown - Exponential back-off w/ jitter for automatic retries - Visibility-timeout guard to avoid duplicate processing - Structured logs across the entire job lifecycle - Unit + integration tests for back-off, registry, state transitions, retries, and concurrency control - Docs: AGENTS.md now lists `cargo run --bin worker` usage Breaking change: - Requires running `make db-migrate` before starting workers Usage: - Start worker: `cargo run --bin worker` - Demo jobs: `cargo run --example job_runner_demo` --- ...62fbeaebf557ed52985860200ab94f79b4ee3.json | 25 ++ ...ef4debc34685d06fbdf859430861051c358af.json | 15 + ...aa6e9add90477ee9ff4ef0f4a81f7db828894.json | 14 + ...88e0e7786b7dfa09bdedcc7889f3811f7752b.json | 108 +++++++ ...644981a26dbc635a27b3e53bf4c908043c297.json | 30 ++ AGENTS.md | 2 + Cargo.lock | 3 + Cargo.toml | 10 + examples/job_runner_demo.rs | 65 ++++ .../20250824104647_update_jobs_table.down.sql | 24 ++ .../20250824104647_update_jobs_table.up.sql | 33 ++ src/bin/worker.rs | 53 +++ src/entities/mod.rs | 27 +- src/jobs/backoff.rs | 60 ++++ src/jobs/entities.rs | 2 + src/jobs/handler.rs | 18 ++ src/jobs/handlers/example.rs | 38 +++ src/jobs/handlers/mod.rs | 3 + src/jobs/mod.rs | 15 + src/jobs/registry.rs | 86 +++++ src/jobs/repository.rs | 172 ++++++++++ src/jobs/worker.rs | 304 ++++++++++++++++++ src/lib.rs | 1 + tests/job_integration.rs | 246 ++++++++++++++ 24 files changed, 1340 insertions(+), 14 deletions(-) create mode 100644 .sqlx/query-2075a12e0695933ac76e3c8e67862fbeaebf557ed52985860200ab94f79b4ee3.json create mode 100644 .sqlx/query-2b0b00e374754ba68477a48dcaeef4debc34685d06fbdf859430861051c358af.json create mode 100644 .sqlx/query-4894089224c089ff5ea365f6c66aa6e9add90477ee9ff4ef0f4a81f7db828894.json create mode 100644 .sqlx/query-843ad10b6531960151f4ba4b5eb88e0e7786b7dfa09bdedcc7889f3811f7752b.json create mode 100644 .sqlx/query-b2943747b0a227563bebf529cdd644981a26dbc635a27b3e53bf4c908043c297.json create mode 100644 examples/job_runner_demo.rs create mode 100644 migrations/20250824104647_update_jobs_table.down.sql create mode 100644 migrations/20250824104647_update_jobs_table.up.sql create mode 100644 src/bin/worker.rs create mode 100644 src/jobs/backoff.rs create mode 100644 src/jobs/entities.rs create mode 100644 src/jobs/handler.rs create mode 100644 src/jobs/handlers/example.rs create mode 100644 src/jobs/handlers/mod.rs create mode 100644 src/jobs/mod.rs create mode 100644 src/jobs/registry.rs create mode 100644 src/jobs/repository.rs create mode 100644 src/jobs/worker.rs create mode 100644 tests/job_integration.rs diff --git a/.sqlx/query-2075a12e0695933ac76e3c8e67862fbeaebf557ed52985860200ab94f79b4ee3.json b/.sqlx/query-2075a12e0695933ac76e3c8e67862fbeaebf557ed52985860200ab94f79b4ee3.json new file mode 100644 index 0000000..32ebd11 --- /dev/null +++ b/.sqlx/query-2075a12e0695933ac76e3c8e67862fbeaebf557ed52985860200ab94f79b4ee3.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO jobs (kind, payload, run_at, max_attempts)\n VALUES ($1, $2, $3, $4)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Text", + "Jsonb", + "Timestamptz", + "Int4" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2075a12e0695933ac76e3c8e67862fbeaebf557ed52985860200ab94f79b4ee3" +} diff --git a/.sqlx/query-2b0b00e374754ba68477a48dcaeef4debc34685d06fbdf859430861051c358af.json b/.sqlx/query-2b0b00e374754ba68477a48dcaeef4debc34685d06fbdf859430861051c358af.json new file mode 100644 index 0000000..6922002 --- /dev/null +++ b/.sqlx/query-2b0b00e374754ba68477a48dcaeef4debc34685d06fbdf859430861051c358af.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE jobs\n SET visibility_till = $2,\n updated_at = now()\n WHERE id = $1 AND status = 'running'::job_status\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "2b0b00e374754ba68477a48dcaeef4debc34685d06fbdf859430861051c358af" +} diff --git a/.sqlx/query-4894089224c089ff5ea365f6c66aa6e9add90477ee9ff4ef0f4a81f7db828894.json b/.sqlx/query-4894089224c089ff5ea365f6c66aa6e9add90477ee9ff4ef0f4a81f7db828894.json new file mode 100644 index 0000000..e788ae1 --- /dev/null +++ b/.sqlx/query-4894089224c089ff5ea365f6c66aa6e9add90477ee9ff4ef0f4a81f7db828894.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE jobs\n SET status = 'succeeded'::job_status,\n visibility_till = NULL,\n reserved_by = NULL,\n updated_at = now()\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "4894089224c089ff5ea365f6c66aa6e9add90477ee9ff4ef0f4a81f7db828894" +} diff --git a/.sqlx/query-843ad10b6531960151f4ba4b5eb88e0e7786b7dfa09bdedcc7889f3811f7752b.json b/.sqlx/query-843ad10b6531960151f4ba4b5eb88e0e7786b7dfa09bdedcc7889f3811f7752b.json new file mode 100644 index 0000000..3b2686a --- /dev/null +++ b/.sqlx/query-843ad10b6531960151f4ba4b5eb88e0e7786b7dfa09bdedcc7889f3811f7752b.json @@ -0,0 +1,108 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE jobs\n SET status = 'running'::job_status,\n visibility_till = $3,\n reserved_by = $2,\n updated_at = now()\n WHERE id IN (\n SELECT id\n FROM jobs\n WHERE (status = 'queued'::job_status OR \n (status = 'running'::job_status AND visibility_till < now()))\n AND run_at <= now()\n ORDER BY run_at\n FOR UPDATE SKIP LOCKED\n LIMIT $1\n )\n RETURNING \n id,\n kind,\n payload,\n run_at,\n attempts,\n max_attempts,\n backoff_seconds,\n status as \"status: JobStatus\",\n last_error,\n visibility_till,\n reserved_by,\n created_at,\n updated_at\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "kind", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "payload", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "run_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "attempts", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "max_attempts", + "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "backoff_seconds", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "status: JobStatus", + "type_info": { + "Custom": { + "name": "job_status", + "kind": { + "Enum": [ + "queued", + "running", + "succeeded", + "failed" + ] + } + } + } + }, + { + "ordinal": 8, + "name": "last_error", + "type_info": "Text" + }, + { + "ordinal": 9, + "name": "visibility_till", + "type_info": "Timestamptz" + }, + { + "ordinal": 10, + "name": "reserved_by", + "type_info": "Uuid" + }, + { + "ordinal": 11, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 12, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8", + "Uuid", + "Timestamptz" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + true, + true, + true, + false, + false + ] + }, + "hash": "843ad10b6531960151f4ba4b5eb88e0e7786b7dfa09bdedcc7889f3811f7752b" +} diff --git a/.sqlx/query-b2943747b0a227563bebf529cdd644981a26dbc635a27b3e53bf4c908043c297.json b/.sqlx/query-b2943747b0a227563bebf529cdd644981a26dbc635a27b3e53bf4c908043c297.json new file mode 100644 index 0000000..e3dcd33 --- /dev/null +++ b/.sqlx/query-b2943747b0a227563bebf529cdd644981a26dbc635a27b3e53bf4c908043c297.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE jobs\n SET status = $2,\n attempts = attempts + 1,\n last_error = $3,\n run_at = COALESCE($4, run_at),\n backoff_seconds = $5,\n visibility_till = NULL,\n reserved_by = NULL,\n updated_at = now()\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + { + "Custom": { + "name": "job_status", + "kind": { + "Enum": [ + "queued", + "running", + "succeeded", + "failed" + ] + } + } + }, + "Text", + "Timestamptz", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "b2943747b0a227563bebf529cdd644981a26dbc635a27b3e53bf4c908043c297" +} diff --git a/AGENTS.md b/AGENTS.md index 5398dba..22bb880 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,6 +7,8 @@ - **Format**: `make fmt` (rustfmt) - **Full Check**: `make check` (fmt + lint + test + audit + deny) - **Run locally**: `make dev` (starts API on 127.0.0.1:8080) +- **Run worker**: `cargo run --bin worker` (starts background job processor) +- **Demo jobs**: `cargo run --example job_runner_demo` (enqueues example jobs) ## Database Commands - **Setup**: `make db-up` → `make db-migrate` diff --git a/Cargo.lock b/Cargo.lock index a3683f2..105beb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,6 +338,7 @@ dependencies = [ "hyper", "jsonwebtoken", "mockall", + "rand", "regex", "reqwest", "scraper", @@ -347,6 +348,7 @@ dependencies = [ "tantivy", "thiserror", "tokio", + "tokio-util", "tower", "tower-http", "tracing", @@ -3390,6 +3392,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 0a3b704..507a85a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,14 @@ name = "capsule" version = "0.1.0" edition = "2024" +[[bin]] +name = "worker" +path = "src/bin/worker.rs" + +[[example]] +name = "job_runner_demo" +path = "examples/job_runner_demo.rs" + [dependencies] anyhow = { version = "1.0.99" } thiserror = { version = "2.0.16" } @@ -33,6 +41,8 @@ tracing = { version = "0.1.40" } tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } tower = { version = "0.5" } tower-http = { version = "0.6", features = ["trace", "request-id"] } +tokio-util = { version = "0.7", features = ["rt"] } +rand = { version = "0.8" } [dev-dependencies] mockall = "0.13" diff --git a/examples/job_runner_demo.rs b/examples/job_runner_demo.rs new file mode 100644 index 0000000..d7cd0d9 --- /dev/null +++ b/examples/job_runner_demo.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use capsule::{ + config::Config, + jobs::{ExampleJobPayload, JobRepository}, +}; +use serde_json::json; + +/// Demo program that enqueues some example jobs to test the job runner +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + // Load configuration + let config = Config::from_env()?; + + // Create database connection pool + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(config.database_url()) + .await?; + + println!("Enqueuing example jobs..."); + + // Enqueue a simple job + let job1_payload = ExampleJobPayload { + message: "Hello from job 1!".to_string(), + delay_ms: Some(2000), + }; + + let job1_id = + JobRepository::enqueue(&pool, "example_job", json!(job1_payload), None, None).await?; + + println!("Enqueued job 1: {}", job1_id); + + // Enqueue another job + let job2_payload = ExampleJobPayload { + message: "Hello from job 2!".to_string(), + delay_ms: Some(1000), + }; + + let job2_id = + JobRepository::enqueue(&pool, "example_job", json!(job2_payload), None, None).await?; + + println!("Enqueued job 2: {}", job2_id); + + // Enqueue a job that will fail (invalid payload) + let job3_id = JobRepository::enqueue( + &pool, + "example_job", + json!({"invalid": "payload"}), + None, + Some(3), // Max 3 attempts + ) + .await?; + + println!("Enqueued failing job: {}", job3_id); + + println!("Jobs enqueued! Start the worker with: cargo run --bin worker"); + println!("Monitor job status with SQL queries on the jobs table."); + + Ok(()) +} diff --git a/migrations/20250824104647_update_jobs_table.down.sql b/migrations/20250824104647_update_jobs_table.down.sql new file mode 100644 index 0000000..64f0d17 --- /dev/null +++ b/migrations/20250824104647_update_jobs_table.down.sql @@ -0,0 +1,24 @@ +-- Revert trigger +DROP TRIGGER IF EXISTS trg_jobs_updated_at ON jobs; + +-- Revert indexes +DROP INDEX IF EXISTS jobs_visibility_till_idx; +DROP INDEX IF EXISTS jobs_reserved_by_idx; + +-- Recreate job_kind enum +CREATE TYPE job_kind AS ENUM ('fetch_and_extract', 'reindex_item', 'delete_item'); + +-- Remove new columns +ALTER TABLE jobs DROP COLUMN payload; +ALTER TABLE jobs DROP COLUMN max_attempts; +ALTER TABLE jobs DROP COLUMN backoff_seconds; +ALTER TABLE jobs DROP COLUMN visibility_till; +ALTER TABLE jobs DROP COLUMN reserved_by; +ALTER TABLE jobs DROP COLUMN created_at; +ALTER TABLE jobs DROP COLUMN updated_at; + +-- Change kind back to enum +ALTER TABLE jobs ALTER COLUMN kind TYPE job_kind USING kind::job_kind; + +-- Revert job_status enum change +ALTER TYPE job_status RENAME VALUE 'succeeded' TO 'done'; diff --git a/migrations/20250824104647_update_jobs_table.up.sql b/migrations/20250824104647_update_jobs_table.up.sql new file mode 100644 index 0000000..45c19cd --- /dev/null +++ b/migrations/20250824104647_update_jobs_table.up.sql @@ -0,0 +1,33 @@ +-- Update job_status enum to include 'succeeded' instead of 'done' +ALTER TYPE job_status RENAME VALUE 'done' TO 'succeeded'; + +-- Add new columns to existing jobs table +ALTER TABLE jobs ADD COLUMN payload JSONB DEFAULT '{}'; +ALTER TABLE jobs ADD COLUMN max_attempts INT DEFAULT 25; +ALTER TABLE jobs ADD COLUMN backoff_seconds INT DEFAULT 0; +ALTER TABLE jobs ADD COLUMN visibility_till TIMESTAMPTZ; +ALTER TABLE jobs ADD COLUMN reserved_by UUID; +ALTER TABLE jobs ADD COLUMN created_at TIMESTAMPTZ DEFAULT now(); +ALTER TABLE jobs ADD COLUMN updated_at TIMESTAMPTZ DEFAULT now(); + +-- Set NOT NULL constraints after adding defaults +ALTER TABLE jobs ALTER COLUMN payload SET NOT NULL; +ALTER TABLE jobs ALTER COLUMN max_attempts SET NOT NULL; +ALTER TABLE jobs ALTER COLUMN backoff_seconds SET NOT NULL; +ALTER TABLE jobs ALTER COLUMN created_at SET NOT NULL; +ALTER TABLE jobs ALTER COLUMN updated_at SET NOT NULL; + +-- Change kind from enum to text +ALTER TABLE jobs ALTER COLUMN kind TYPE TEXT; + +-- Drop the old job_kind enum (no longer needed) +DROP TYPE job_kind; + +-- Add new indexes for efficient job polling +CREATE INDEX jobs_visibility_till_idx ON jobs (visibility_till); +CREATE INDEX jobs_reserved_by_idx ON jobs (reserved_by); + +-- Add trigger for updated_at +CREATE TRIGGER trg_jobs_updated_at +BEFORE UPDATE ON jobs +FOR EACH ROW EXECUTE FUNCTION set_updated_at(); diff --git a/src/bin/worker.rs b/src/bin/worker.rs new file mode 100644 index 0000000..34c6cfe --- /dev/null +++ b/src/bin/worker.rs @@ -0,0 +1,53 @@ +use anyhow::Result; +use capsule::{ + config::Config, + jobs::{ExampleJobHandler, JobRegistry, WorkerConfig, WorkerSupervisor}, +}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + // Load configuration + let config = Config::from_env()?; + + // Create database connection pool + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(10) + .connect(config.database_url()) + .await?; + + // Run migrations + sqlx::migrate!("./migrations").run(&pool).await?; + + // Create job registry and register handlers + let mut registry = JobRegistry::new(); + registry.register(ExampleJobHandler); + + // Create worker configuration + let worker_config = WorkerConfig { + concurrency: std::env::var("WORKER_CONCURRENCY") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(4), + poll_interval_ms: std::env::var("WORKER_POLL_INTERVAL_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(1000), + visibility_timeout_secs: std::env::var("WORKER_VISIBILITY_TIMEOUT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(300), + base_backoff_secs: std::env::var("WORKER_BASE_BACKOFF_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(30), + }; + + // Create and run supervisor + let supervisor = WorkerSupervisor::new(pool, registry, worker_config); + supervisor.run().await +} diff --git a/src/entities/mod.rs b/src/entities/mod.rs index 33b3a7c..ecdb15f 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -14,20 +14,13 @@ pub enum ItemStatus { Archived, } -#[derive(sqlx::Type, Debug, Clone, Copy, PartialEq, Eq)] -#[sqlx(type_name = "job_kind", rename_all = "snake_case")] -pub enum JobKind { - FetchAndExtract, - ReindexItem, - DeleteItem, -} - -#[derive(sqlx::Type, Debug, Clone, Copy, PartialEq, Eq)] +#[derive(sqlx::Type, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[sqlx(type_name = "job_status", rename_all = "lowercase")] +#[serde(rename_all = "lowercase")] pub enum JobStatus { Queued, Running, - Done, + Succeeded, Failed, } @@ -79,10 +72,16 @@ pub struct ItemTag { #[derive(Debug, Clone, FromRow)] pub struct Job { pub id: Uuid, - pub kind: JobKind, - pub item_id: Option, // FK -> items.id + pub kind: String, // logical job name + pub payload: serde_json::Value, // job data as JSONB + pub run_at: DateTime, // next time the job is eligible + pub attempts: i32, // execution attempts so far + pub max_attempts: i32, // maximum attempts before giving up + pub backoff_seconds: i32, // populated when job fails pub status: JobStatus, - pub run_at: DateTime, - pub attempts: i32, pub last_error: Option, + pub visibility_till: Option>, // set while "running" + pub reserved_by: Option, // worker instance id + pub created_at: DateTime, + pub updated_at: DateTime, } diff --git a/src/jobs/backoff.rs b/src/jobs/backoff.rs new file mode 100644 index 0000000..abbe8c2 --- /dev/null +++ b/src/jobs/backoff.rs @@ -0,0 +1,60 @@ +use rand::Rng; +use std::time::Duration; + +/// Calculate exponential backoff delay with jitter +pub fn calculate_backoff_delay(attempt: i32, base_delay_secs: u32) -> Duration { + let attempt = attempt.max(0) as u32; + + // Cap the exponent to prevent overflow (max ~8.5 hours with 30s base) + let capped_attempt = attempt.min(10); + + // Calculate base delay: base * 2^attempt + let base_delay = base_delay_secs.saturating_mul(2_u32.saturating_pow(capped_attempt)); + + // Add jitter: ±30% randomness + let jitter_factor = rand::thread_rng().gen_range(0.7..1.3); + let delay_with_jitter = (base_delay as f64 * jitter_factor).round() as u64; + + Duration::from_secs(delay_with_jitter) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_backoff_progression() { + let base_delay = 30; + + // Test first few attempts have reasonable progression + let delay0 = calculate_backoff_delay(0, base_delay); + let delay1 = calculate_backoff_delay(1, base_delay); + let delay2 = calculate_backoff_delay(2, base_delay); + + // Delays should be in expected ranges with jitter + assert!(delay0.as_secs() >= 21 && delay0.as_secs() <= 39); // 30s ±30% + assert!(delay1.as_secs() >= 42 && delay1.as_secs() <= 78); // 60s ±30% + assert!(delay2.as_secs() >= 84 && delay2.as_secs() <= 156); // 120s ±30% + } + + #[test] + fn test_backoff_cap() { + let base_delay = 30; + + // Very high attempt numbers should be capped at attempt 10 + let delay_high = calculate_backoff_delay(20, base_delay); + let delay_capped = calculate_backoff_delay(10, base_delay); + + // Both should be within reasonable bounds (max delay with jitter should be similar) + // At attempt 10: base * 2^10 = 30 * 1024 = 30720s with jitter 0.7-1.3 = ~21k-40k + assert!(delay_high.as_secs() >= 21000 && delay_high.as_secs() <= 40000); + assert!(delay_capped.as_secs() >= 21000 && delay_capped.as_secs() <= 40000); + } + + #[test] + fn test_negative_attempt() { + let delay = calculate_backoff_delay(-5, 30); + // Should handle negative attempts gracefully + assert!(delay.as_secs() >= 21 && delay.as_secs() <= 39); + } +} diff --git a/src/jobs/entities.rs b/src/jobs/entities.rs new file mode 100644 index 0000000..a3a45bd --- /dev/null +++ b/src/jobs/entities.rs @@ -0,0 +1,2 @@ +// Re-export job entities from main entities module +pub use crate::entities::{Job, JobStatus}; diff --git a/src/jobs/handler.rs b/src/jobs/handler.rs new file mode 100644 index 0000000..8c9632c --- /dev/null +++ b/src/jobs/handler.rs @@ -0,0 +1,18 @@ +use async_trait::async_trait; +use serde_json::Value; +use sqlx::PgPool; +use tracing::Span; + +/// Trait for handling specific job types +#[async_trait] +pub trait JobHandler: Send + Sync + 'static { + /// Execute the job + async fn run(&self, payload: Value, pool: &PgPool, span: Span) -> anyhow::Result<()>; + + /// Get the job kind this handler processes + fn kind(&self) -> &'static str; +} + +/// Type-erased job handler factory +pub type JobHandlerFactory = + Box anyhow::Result> + Send + Sync>; diff --git a/src/jobs/handlers/example.rs b/src/jobs/handlers/example.rs new file mode 100644 index 0000000..9b3a91a --- /dev/null +++ b/src/jobs/handlers/example.rs @@ -0,0 +1,38 @@ +use crate::jobs::JobHandler; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sqlx::PgPool; +use tracing::{Span, info}; + +/// Example job payload for demonstrating the job system +#[derive(Debug, Serialize, Deserialize)] +pub struct ExampleJobPayload { + pub message: String, + pub delay_ms: Option, +} + +/// Example job handler that logs a message and optionally sleeps +#[derive(Clone, Debug)] +pub struct ExampleJobHandler; + +#[async_trait] +impl JobHandler for ExampleJobHandler { + async fn run(&self, payload: Value, _pool: &PgPool, _span: Span) -> anyhow::Result<()> { + let payload: ExampleJobPayload = serde_json::from_value(payload)?; + + info!("Processing example job: {}", payload.message); + + if let Some(delay_ms) = payload.delay_ms { + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; + info!("Finished sleeping for {}ms", delay_ms); + } + + info!("Example job completed successfully"); + Ok(()) + } + + fn kind(&self) -> &'static str { + "example_job" + } +} diff --git a/src/jobs/handlers/mod.rs b/src/jobs/handlers/mod.rs new file mode 100644 index 0000000..312528f --- /dev/null +++ b/src/jobs/handlers/mod.rs @@ -0,0 +1,3 @@ +pub mod example; + +pub use example::*; diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs new file mode 100644 index 0000000..db4206e --- /dev/null +++ b/src/jobs/mod.rs @@ -0,0 +1,15 @@ +pub mod backoff; +pub mod entities; +pub mod handler; +pub mod handlers; +pub mod registry; +pub mod repository; +pub mod worker; + +pub use backoff::*; +pub use entities::*; +pub use handler::*; +pub use handlers::*; +pub use registry::*; +pub use repository::*; +pub use worker::*; diff --git a/src/jobs/registry.rs b/src/jobs/registry.rs new file mode 100644 index 0000000..5319584 --- /dev/null +++ b/src/jobs/registry.rs @@ -0,0 +1,86 @@ +use crate::jobs::{JobHandler, JobHandlerFactory}; +use anyhow::{Result, anyhow}; +use serde_json::Value; +use std::collections::HashMap; + +/// Registry of job handlers by kind +#[derive(Default)] +pub struct JobRegistry { + handlers: HashMap<&'static str, JobHandlerFactory>, +} + +impl JobRegistry { + pub fn new() -> Self { + Self { + handlers: HashMap::new(), + } + } + + /// Register a job handler for a specific kind + pub fn register(&mut self, handler: H) { + let kind = handler.kind(); + let factory: JobHandlerFactory = + Box::new(move |_payload| Ok(Box::new(handler.clone()) as Box)); + self.handlers.insert(kind, factory); + } + + /// Create a handler instance for the given job kind and payload + pub fn create_handler(&self, kind: &str, payload: Value) -> Result> { + let factory = self + .handlers + .get(kind) + .ok_or_else(|| anyhow!("No handler registered for job kind: {}", kind))?; + + factory(payload) + } + + /// Get all registered job kinds + pub fn registered_kinds(&self) -> Vec<&'static str> { + self.handlers.keys().copied().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::jobs::JobHandler; + use async_trait::async_trait; + use serde_json::json; + use sqlx::PgPool; + use tracing::Span; + + #[derive(Clone)] + struct TestJobHandler; + + #[async_trait] + impl JobHandler for TestJobHandler { + async fn run(&self, _payload: Value, _pool: &PgPool, _span: Span) -> anyhow::Result<()> { + Ok(()) + } + + fn kind(&self) -> &'static str { + "test_job" + } + } + + #[test] + fn test_registry_registration() { + let mut registry = JobRegistry::new(); + registry.register(TestJobHandler); + + let kinds = registry.registered_kinds(); + assert_eq!(kinds, vec!["test_job"]); + } + + #[test] + fn test_create_handler() { + let mut registry = JobRegistry::new(); + registry.register(TestJobHandler); + + let result = registry.create_handler("test_job", json!({})); + assert!(result.is_ok()); + + let result = registry.create_handler("unknown_job", json!({})); + assert!(result.is_err()); + } +} diff --git a/src/jobs/repository.rs b/src/jobs/repository.rs new file mode 100644 index 0000000..f28dfa9 --- /dev/null +++ b/src/jobs/repository.rs @@ -0,0 +1,172 @@ +use crate::entities::{Job, JobStatus}; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use serde_json::Value; +use sqlx::PgPool; +use uuid::Uuid; + +pub struct JobRepository; + +impl JobRepository { + /// Enqueue a new job + pub async fn enqueue( + pool: &PgPool, + kind: &str, + payload: Value, + run_at: Option>, + max_attempts: Option, + ) -> Result { + let run_at = run_at.unwrap_or_else(Utc::now); + let max_attempts = max_attempts.unwrap_or(25); + + let result = sqlx::query!( + r#" + INSERT INTO jobs (kind, payload, run_at, max_attempts) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + kind, + payload, + run_at, + max_attempts + ) + .fetch_one(pool) + .await?; + + Ok(result.id) + } + + /// Fetch due jobs and reserve them for processing + pub async fn fetch_due_jobs( + pool: &PgPool, + limit: i64, + worker_id: Uuid, + visibility_timeout_secs: i64, + ) -> Result> { + let visibility_till = Utc::now() + chrono::Duration::seconds(visibility_timeout_secs); + + let jobs = sqlx::query_as!( + Job, + r#" + UPDATE jobs + SET status = 'running'::job_status, + visibility_till = $3, + reserved_by = $2, + updated_at = now() + WHERE id IN ( + SELECT id + FROM jobs + WHERE (status = 'queued'::job_status OR + (status = 'running'::job_status AND visibility_till < now())) + AND run_at <= now() + ORDER BY run_at + FOR UPDATE SKIP LOCKED + LIMIT $1 + ) + RETURNING + id, + kind, + payload, + run_at, + attempts, + max_attempts, + backoff_seconds, + status as "status: JobStatus", + last_error, + visibility_till, + reserved_by, + created_at, + updated_at + "#, + limit, + worker_id, + visibility_till + ) + .fetch_all(pool) + .await?; + + Ok(jobs) + } + + /// Mark job as succeeded + pub async fn mark_success(pool: &PgPool, job_id: Uuid) -> Result<()> { + sqlx::query!( + r#" + UPDATE jobs + SET status = 'succeeded'::job_status, + visibility_till = NULL, + reserved_by = NULL, + updated_at = now() + WHERE id = $1 + "#, + job_id + ) + .execute(pool) + .await?; + + Ok(()) + } + + /// Mark job as failed and schedule retry or mark as permanently failed + pub async fn mark_failure( + pool: &PgPool, + job_id: Uuid, + error_message: &str, + next_run_at: Option>, + backoff_seconds: i32, + ) -> Result<()> { + let (status, next_run) = if let Some(run_at) = next_run_at { + (JobStatus::Queued, Some(run_at)) + } else { + (JobStatus::Failed, None) + }; + + sqlx::query!( + r#" + UPDATE jobs + SET status = $2, + attempts = attempts + 1, + last_error = $3, + run_at = COALESCE($4, run_at), + backoff_seconds = $5, + visibility_till = NULL, + reserved_by = NULL, + updated_at = now() + WHERE id = $1 + "#, + job_id, + status as JobStatus, + error_message, + next_run, + backoff_seconds + ) + .execute(pool) + .await?; + + Ok(()) + } + + /// Extend visibility timeout for a running job + pub async fn extend_visibility( + pool: &PgPool, + job_id: Uuid, + visibility_timeout_secs: i64, + ) -> Result<()> { + let new_visibility_till = Utc::now() + chrono::Duration::seconds(visibility_timeout_secs); + + sqlx::query!( + r#" + UPDATE jobs + SET visibility_till = $2, + updated_at = now() + WHERE id = $1 AND status = 'running'::job_status + "#, + job_id, + new_visibility_till + ) + .execute(pool) + .await?; + + Ok(()) + } +} diff --git a/src/jobs/worker.rs b/src/jobs/worker.rs new file mode 100644 index 0000000..f6339d7 --- /dev/null +++ b/src/jobs/worker.rs @@ -0,0 +1,304 @@ +use crate::jobs::{JobRegistry, JobRepository, calculate_backoff_delay}; +use anyhow::Result; +use chrono::Utc; +use sqlx::PgPool; +use std::{sync::Arc, time::Duration}; +use tokio::{ + signal, + sync::{Semaphore, mpsc}, + time::{interval, sleep}, +}; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, debug, error, info, info_span, warn}; +use uuid::Uuid; + +/// Worker configuration +#[derive(Clone)] +pub struct WorkerConfig { + pub concurrency: usize, + pub poll_interval_ms: u64, + pub visibility_timeout_secs: i64, + pub base_backoff_secs: u32, +} + +impl Default for WorkerConfig { + fn default() -> Self { + Self { + concurrency: 4, + poll_interval_ms: 1000, + visibility_timeout_secs: 300, // 5 minutes + base_backoff_secs: 30, + } + } +} + +/// Main worker supervisor that orchestrates job processing +pub struct WorkerSupervisor { + pool: PgPool, + registry: Arc, + config: WorkerConfig, + worker_id: Uuid, + shutdown_token: CancellationToken, +} + +impl WorkerSupervisor { + pub fn new(pool: PgPool, registry: JobRegistry, config: WorkerConfig) -> Self { + Self { + pool, + registry: Arc::new(registry), + config, + worker_id: Uuid::new_v4(), + shutdown_token: CancellationToken::new(), + } + } + + /// Start the worker supervisor + pub async fn run(self) -> Result<()> { + info!("Starting worker supervisor with ID: {}", self.worker_id); + info!( + "Configuration - concurrency: {}, poll_interval: {}ms, visibility_timeout: {}s", + self.config.concurrency, + self.config.poll_interval_ms, + self.config.visibility_timeout_secs + ); + + // Create bounded channel for jobs + let (job_sender, job_receiver) = mpsc::channel(self.config.concurrency * 2); + + // Semaphore to limit concurrent job processing + let semaphore = Arc::new(Semaphore::new(self.config.concurrency)); + + // Spawn shutdown handler + let shutdown_token = self.shutdown_token.clone(); + tokio::spawn(async move { + if let Err(e) = signal::ctrl_c().await { + error!("Failed to listen for shutdown signal: {}", e); + return; + } + info!("Received shutdown signal, initiating graceful shutdown..."); + shutdown_token.cancel(); + }); + + // Spawn job fetcher + let fetcher_handle = { + let pool = self.pool.clone(); + let worker_id = self.worker_id; + let config = self.config.clone(); + let shutdown_token = self.shutdown_token.clone(); + tokio::spawn( + WorkerSupervisor::run_fetcher_static( + pool, + worker_id, + config, + job_sender, + shutdown_token, + ) + .instrument(info_span!("fetcher", worker_id = %worker_id)), + ) + }; + + // Spawn job processor + let processor_handle = { + let pool = self.pool.clone(); + let registry = self.registry.clone(); + let config = self.config.clone(); + let semaphore = semaphore.clone(); + let shutdown_token = self.shutdown_token.clone(); + tokio::spawn( + WorkerSupervisor::run_processor_static( + pool, + registry, + config, + job_receiver, + semaphore, + shutdown_token, + ) + .instrument(info_span!("processor", worker_id = %self.worker_id)), + ) + }; + + // Wait for shutdown signal + self.shutdown_token.cancelled().await; + info!("Shutdown initiated, waiting for tasks to complete..."); + + // Wait for all permits to be available (all jobs completed) + let _permits = semaphore + .acquire_many(self.config.concurrency as u32) + .await?; + info!("All jobs completed, shutting down"); + + // Wait for fetcher and processor to finish + let _ = tokio::join!(fetcher_handle, processor_handle); + + Ok(()) + } + + /// Job fetching loop + async fn run_fetcher_static( + pool: PgPool, + worker_id: Uuid, + config: WorkerConfig, + job_sender: mpsc::Sender, + shutdown_token: CancellationToken, + ) -> Result<()> { + let mut poll_interval = interval(Duration::from_millis(config.poll_interval_ms)); + + loop { + tokio::select! { + _ = shutdown_token.cancelled() => { + info!("Fetcher shutting down"); + break; + } + _ = poll_interval.tick() => { + match JobRepository::fetch_due_jobs( + &pool, + config.concurrency as i64, + worker_id, + config.visibility_timeout_secs, + ) + .await + { + Ok(jobs) => { + debug!("Fetched {} jobs", jobs.len()); + for job in jobs { + if job_sender.send(job).await.is_err() { + warn!("Job receiver dropped, stopping fetcher"); + return Ok(()); + } + } + } + Err(e) => { + error!("Failed to fetch jobs: {}", e); + // Brief pause on error to avoid tight loop + sleep(Duration::from_millis(1000)).await; + } + } + } + } + } + Ok(()) + } + + /// Job processing loop + async fn run_processor_static( + pool: PgPool, + registry: Arc, + config: WorkerConfig, + mut job_receiver: mpsc::Receiver, + semaphore: Arc, + shutdown_token: CancellationToken, + ) -> Result<()> { + while let Some(job) = tokio::select! { + _ = shutdown_token.cancelled() => None, + job = job_receiver.recv() => job, + } { + let permit = semaphore.clone().acquire_owned().await?; + let pool = pool.clone(); + let registry = registry.clone(); + let config = config.clone(); + + // Capture fields for tracing before moving job + let job_id = job.id; + let job_kind = job.kind.clone(); + let job_attempt = job.attempts; + + tokio::spawn( + async move { + let _permit = permit; // Hold permit until job completes + Self::process_job(pool, registry, config, job).await; + } + .instrument( + info_span!("job", id = %job_id, kind = %job_kind, attempt = job_attempt), + ), + ); + } + + info!("Processor shutting down"); + Ok(()) + } + + /// Process a single job + async fn process_job( + pool: PgPool, + registry: Arc, + config: WorkerConfig, + job: crate::entities::Job, + ) { + info!("Processing job {} (attempt {})", job.id, job.attempts + 1); + + let span = info_span!("job_execution", id = %job.id, kind = %job.kind); + + // Create handler for this job + let handler = match registry.create_handler(&job.kind, job.payload.clone()) { + Ok(handler) => handler, + Err(e) => { + error!("Failed to create handler for job {}: {}", job.id, e); + let _ = JobRepository::mark_failure( + &pool, + job.id, + &format!("Failed to create handler: {}", e), + None, + 0, + ) + .await; + return; + } + }; + + // Execute the job + let result = handler.run(job.payload.clone(), &pool, span.clone()).await; + + match result { + Ok(()) => { + info!("Job {} completed successfully", job.id); + if let Err(e) = JobRepository::mark_success(&pool, job.id).await { + error!("Failed to mark job {} as successful: {}", job.id, e); + } + } + Err(e) => { + let attempt = job.attempts + 1; + error!("Job {} failed (attempt {}): {}", job.id, attempt, e); + + // Determine if we should retry + if attempt < job.max_attempts { + let backoff_delay = calculate_backoff_delay(attempt, config.base_backoff_secs); + let next_run_at = + Utc::now() + chrono::Duration::from_std(backoff_delay).unwrap(); + + info!( + "Job {} will retry in {} seconds (attempt {}/{})", + job.id, + backoff_delay.as_secs(), + attempt + 1, + job.max_attempts + ); + + if let Err(retry_err) = JobRepository::mark_failure( + &pool, + job.id, + &e.to_string(), + Some(next_run_at), + backoff_delay.as_secs() as i32, + ) + .await + { + error!("Failed to schedule retry for job {}: {}", job.id, retry_err); + } + } else { + info!( + "Job {} permanently failed after {} attempts", + job.id, attempt + ); + if let Err(fail_err) = + JobRepository::mark_failure(&pool, job.id, &e.to_string(), None, 0).await + { + error!( + "Failed to mark job {} as permanently failed: {}", + job.id, fail_err + ); + } + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 726cb7b..4f37e05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod config; pub mod entities; pub mod health; pub mod items; +pub mod jobs; pub mod middleware; pub mod passwords; pub mod repositories; diff --git a/tests/job_integration.rs b/tests/job_integration.rs new file mode 100644 index 0000000..fa3a9e0 --- /dev/null +++ b/tests/job_integration.rs @@ -0,0 +1,246 @@ +use chrono::Utc; +use serde_json::json; +use sqlx::{Pool, Postgres}; +use uuid::Uuid; + +use capsule::{ + entities::JobStatus, + jobs::{JobRepository}, +}; + +/// Test that basic job repository operations work correctly +#[sqlx::test] +async fn test_job_enqueue_and_fetch(pool: Pool) { + // Test enqueuing a job + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"test": "data"}), + None, + None, + ) + .await + .expect("Failed to enqueue job"); + + // Verify job was created correctly + let job = sqlx::query!( + "SELECT kind, payload, status::text as status, attempts FROM jobs WHERE id = $1", + job_id + ) + .fetch_one(&pool) + .await + .expect("Failed to fetch created job"); + + assert_eq!(job.kind, "test_job"); + assert_eq!(job.payload, json!({"test": "data"})); + assert_eq!(job.status, Some("queued".to_string())); + assert_eq!(job.attempts, 0); + + // Test fetching due jobs + let worker_id = Uuid::new_v4(); + let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300).await + .expect("Failed to fetch due jobs"); + + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].id, job_id); + assert_eq!(jobs[0].status, JobStatus::Running); + assert_eq!(jobs[0].reserved_by, Some(worker_id)); + assert!(jobs[0].visibility_till.is_some()); +} + +/// Test job success marking +#[sqlx::test] +async fn test_job_success(pool: Pool) { + // Enqueue a job + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"test": "data"}), + None, + None, + ) + .await + .expect("Failed to enqueue job"); + + // Mark it as successful + JobRepository::mark_success(&pool, job_id).await + .expect("Failed to mark job as successful"); + + // Verify the status + let job = sqlx::query!( + "SELECT status::text as status, reserved_by, visibility_till FROM jobs WHERE id = $1", + job_id + ) + .fetch_one(&pool) + .await + .expect("Failed to fetch job after success"); + + assert_eq!(job.status, Some("succeeded".to_string())); + assert!(job.reserved_by.is_none()); + assert!(job.visibility_till.is_none()); +} + +/// Test job failure marking with retry +#[sqlx::test] +async fn test_job_failure_with_retry(pool: Pool) { + // Enqueue a job + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"test": "data"}), + None, + Some(3), + ) + .await + .expect("Failed to enqueue job"); + + // Mark it as failed with retry + let next_run_at = Utc::now() + chrono::Duration::minutes(5); + JobRepository::mark_failure( + &pool, + job_id, + "Test error", + Some(next_run_at), + 60, + ) + .await + .expect("Failed to mark job as failed"); + + // Verify the status + let job = sqlx::query!( + "SELECT status::text as status, attempts, last_error, backoff_seconds FROM jobs WHERE id = $1", + job_id + ) + .fetch_one(&pool) + .await + .expect("Failed to fetch job after failure"); + + assert_eq!(job.status, Some("queued".to_string())); // Should be queued for retry + assert_eq!(job.attempts, 1); + assert_eq!(job.last_error, Some("Test error".to_string())); + assert_eq!(job.backoff_seconds, 60); +} + +/// Test job failure marking without retry (permanent failure) +#[sqlx::test] +async fn test_job_permanent_failure(pool: Pool) { + // Enqueue a job + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"test": "data"}), + None, + Some(1), // Only 1 attempt + ) + .await + .expect("Failed to enqueue job"); + + // Mark it as failed without retry (permanent failure) + JobRepository::mark_failure( + &pool, + job_id, + "Permanent error", + None, // No next run time = permanent failure + 0, + ) + .await + .expect("Failed to mark job as permanently failed"); + + // Verify the status + let job = sqlx::query!( + "SELECT status::text as status, attempts, last_error FROM jobs WHERE id = $1", + job_id + ) + .fetch_one(&pool) + .await + .expect("Failed to fetch job after permanent failure"); + + assert_eq!(job.status, Some("failed".to_string())); + assert_eq!(job.attempts, 1); + assert_eq!(job.last_error, Some("Permanent error".to_string())); +} + +/// Test job visibility timeout behavior +#[sqlx::test] +async fn test_job_visibility_timeout(pool: Pool) { + // Enqueue a job + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"test": "data"}), + None, + None, + ) + .await + .expect("Failed to enqueue job"); + + // Fetch it with a short visibility timeout + let worker_id = Uuid::new_v4(); + let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id, 1).await // 1 second timeout + .expect("Failed to fetch due jobs"); + + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].status, JobStatus::Running); + + // Wait for the visibility timeout to expire + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Try to fetch again with a different worker - should succeed + let worker_id_2 = Uuid::new_v4(); + let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id_2, 300).await + .expect("Failed to fetch due jobs after timeout"); + + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].id, job_id); + assert_eq!(jobs[0].reserved_by, Some(worker_id_2)); // Should be reserved by new worker +} + +/// Test that multiple jobs can be fetched and processed +#[sqlx::test] +async fn test_multiple_job_processing(pool: Pool) { + // Enqueue multiple jobs + let mut job_ids = Vec::new(); + for i in 0..5 { + let job_id = JobRepository::enqueue( + &pool, + "test_job", + json!({"index": i}), + None, + None, + ) + .await + .expect("Failed to enqueue job"); + job_ids.push(job_id); + } + + // Fetch all jobs at once + let worker_id = Uuid::new_v4(); + let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300).await + .expect("Failed to fetch due jobs"); + + assert_eq!(jobs.len(), 5); + for job in &jobs { + assert_eq!(job.status, JobStatus::Running); + assert_eq!(job.reserved_by, Some(worker_id)); + assert!(job_ids.contains(&job.id)); + } + + // Mark all jobs as successful + for job in jobs { + JobRepository::mark_success(&pool, job.id).await + .expect("Failed to mark job as successful"); + } + + // Verify all are marked as succeeded + for job_id in job_ids { + let job = sqlx::query!( + "SELECT status::text as status FROM jobs WHERE id = $1", + job_id + ) + .fetch_one(&pool) + .await + .expect("Failed to fetch job"); + + assert_eq!(job.status, Some("succeeded".to_string())); + } +} From e89a0c25a18cd5668f82ca08a460b2718f973478 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:31:41 +0200 Subject: [PATCH 2/6] chore: formatting --- tests/job_integration.rs | 97 +++++++++++++--------------------------- 1 file changed, 32 insertions(+), 65 deletions(-) diff --git a/tests/job_integration.rs b/tests/job_integration.rs index fa3a9e0..e6ae263 100644 --- a/tests/job_integration.rs +++ b/tests/job_integration.rs @@ -3,24 +3,15 @@ use serde_json::json; use sqlx::{Pool, Postgres}; use uuid::Uuid; -use capsule::{ - entities::JobStatus, - jobs::{JobRepository}, -}; +use capsule::{entities::JobStatus, jobs::JobRepository}; /// Test that basic job repository operations work correctly #[sqlx::test] async fn test_job_enqueue_and_fetch(pool: Pool) { // Test enqueuing a job - let job_id = JobRepository::enqueue( - &pool, - "test_job", - json!({"test": "data"}), - None, - None, - ) - .await - .expect("Failed to enqueue job"); + let job_id = JobRepository::enqueue(&pool, "test_job", json!({"test": "data"}), None, None) + .await + .expect("Failed to enqueue job"); // Verify job was created correctly let job = sqlx::query!( @@ -38,7 +29,8 @@ async fn test_job_enqueue_and_fetch(pool: Pool) { // Test fetching due jobs let worker_id = Uuid::new_v4(); - let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300).await + let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300) + .await .expect("Failed to fetch due jobs"); assert_eq!(jobs.len(), 1); @@ -52,18 +44,13 @@ async fn test_job_enqueue_and_fetch(pool: Pool) { #[sqlx::test] async fn test_job_success(pool: Pool) { // Enqueue a job - let job_id = JobRepository::enqueue( - &pool, - "test_job", - json!({"test": "data"}), - None, - None, - ) - .await - .expect("Failed to enqueue job"); + let job_id = JobRepository::enqueue(&pool, "test_job", json!({"test": "data"}), None, None) + .await + .expect("Failed to enqueue job"); // Mark it as successful - JobRepository::mark_success(&pool, job_id).await + JobRepository::mark_success(&pool, job_id) + .await .expect("Failed to mark job as successful"); // Verify the status @@ -84,27 +71,15 @@ async fn test_job_success(pool: Pool) { #[sqlx::test] async fn test_job_failure_with_retry(pool: Pool) { // Enqueue a job - let job_id = JobRepository::enqueue( - &pool, - "test_job", - json!({"test": "data"}), - None, - Some(3), - ) - .await - .expect("Failed to enqueue job"); + let job_id = JobRepository::enqueue(&pool, "test_job", json!({"test": "data"}), None, Some(3)) + .await + .expect("Failed to enqueue job"); // Mark it as failed with retry let next_run_at = Utc::now() + chrono::Duration::minutes(5); - JobRepository::mark_failure( - &pool, - job_id, - "Test error", - Some(next_run_at), - 60, - ) - .await - .expect("Failed to mark job as failed"); + JobRepository::mark_failure(&pool, job_id, "Test error", Some(next_run_at), 60) + .await + .expect("Failed to mark job as failed"); // Verify the status let job = sqlx::query!( @@ -164,19 +139,14 @@ async fn test_job_permanent_failure(pool: Pool) { #[sqlx::test] async fn test_job_visibility_timeout(pool: Pool) { // Enqueue a job - let job_id = JobRepository::enqueue( - &pool, - "test_job", - json!({"test": "data"}), - None, - None, - ) - .await - .expect("Failed to enqueue job"); + let job_id = JobRepository::enqueue(&pool, "test_job", json!({"test": "data"}), None, None) + .await + .expect("Failed to enqueue job"); // Fetch it with a short visibility timeout let worker_id = Uuid::new_v4(); - let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id, 1).await // 1 second timeout + let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id, 1) + .await // 1 second timeout .expect("Failed to fetch due jobs"); assert_eq!(jobs.len(), 1); @@ -187,7 +157,8 @@ async fn test_job_visibility_timeout(pool: Pool) { // Try to fetch again with a different worker - should succeed let worker_id_2 = Uuid::new_v4(); - let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id_2, 300).await + let jobs = JobRepository::fetch_due_jobs(&pool, 1, worker_id_2, 300) + .await .expect("Failed to fetch due jobs after timeout"); assert_eq!(jobs.len(), 1); @@ -201,21 +172,16 @@ async fn test_multiple_job_processing(pool: Pool) { // Enqueue multiple jobs let mut job_ids = Vec::new(); for i in 0..5 { - let job_id = JobRepository::enqueue( - &pool, - "test_job", - json!({"index": i}), - None, - None, - ) - .await - .expect("Failed to enqueue job"); + let job_id = JobRepository::enqueue(&pool, "test_job", json!({"index": i}), None, None) + .await + .expect("Failed to enqueue job"); job_ids.push(job_id); } // Fetch all jobs at once let worker_id = Uuid::new_v4(); - let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300).await + let jobs = JobRepository::fetch_due_jobs(&pool, 10, worker_id, 300) + .await .expect("Failed to fetch due jobs"); assert_eq!(jobs.len(), 5); @@ -227,7 +193,8 @@ async fn test_multiple_job_processing(pool: Pool) { // Mark all jobs as successful for job in jobs { - JobRepository::mark_success(&pool, job.id).await + JobRepository::mark_success(&pool, job.id) + .await .expect("Failed to mark job as successful"); } @@ -240,7 +207,7 @@ async fn test_multiple_job_processing(pool: Pool) { .fetch_one(&pool) .await .expect("Failed to fetch job"); - + assert_eq!(job.status, Some("succeeded".to_string())); } } From bc83fe341972d4f0971943a9836d0e3cef8dcc96 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:32:30 +0200 Subject: [PATCH 3/6] Update CI workflow to try and improve speed of workflow --- .github/workflows/ci.yml | 55 ++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 98f0fd4..2a668c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,8 @@ env: CARGO_TERM_COLOR: always DATABASE_URL: postgres://capsule:capsule_password@localhost:5432/capsule_dev JWT_SECRET: dev-secret-change-me + RUSTC_WRAPPER: sccache + SCCACHE_GHA_ENABLED: "true" jobs: ci: @@ -30,33 +32,41 @@ jobs: --health-retries 5 steps: - - name: Checkout code + - name: Checkout uses: actions/checkout@v4 - - name: Install Rust toolchain + - name: Rust toolchain uses: dtolnay/rust-toolchain@v1 with: toolchain: stable components: rustfmt, clippy - - name: Cache Rust dependencies + - name: Setup sccache + uses: mozilla-actions/sccache-action@v0.0.6 + + - name: Rust dependency cache uses: Swatinem/rust-cache@v2 with: - shared-key: "ci-cache" - - - name: Install tools - run: | - cargo install sqlx-cli --no-default-features --features native-tls,postgres - cargo install cargo-audit - cargo install cargo-deny + shared-key: ci-cache + cache-provider: github - - name: Cache SQLx - uses: actions/cache@v4 + # Fast, cached installs for cargo-installed tools + - name: Install sqlx-cli (cached) + uses: baptiste0928/cargo-install@v3 with: - path: | - ~/.cargo/bin/sqlx - .sqlx - key: sqlx-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + crate: sqlx-cli + version: 0.7.4 + args: --no-default-features --features native-tls,postgres + locked: true + + # Use purpose-built actions instead of installing binaries + - name: Security audit + uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + - name: License/dependency policy + uses: EmbarkStudios/cargo-deny-action@v2 - name: Wait for PostgreSQL run: | @@ -71,17 +81,14 @@ jobs: - name: Check formatting run: cargo fmt --all -- --check - - name: Run clippy + - name: Clippy run: cargo clippy --all-targets --all-features -- -D warnings - - name: Run tests - run: cargo test --all-features + - name: Tests + run: cargo test --all-features --locked - name: SQLx prepare check run: cargo sqlx prepare --check --workspace -- --all-features - # - name: Security audit - # run: cargo audit - - # - name: License/dependency check - # run: cargo deny check + - name: sccache stats + run: sccache --show-stats || true From 846c4f715ca210ed6cdbe1c728ad36dfd304a862 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:41:44 +0200 Subject: [PATCH 4/6] ci: Remove security audit step from CI workflow --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a668c6..86f4812 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,10 +60,10 @@ jobs: locked: true # Use purpose-built actions instead of installing binaries - - name: Security audit - uses: actions-rs/audit-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} + # - name: Security audit + # uses: actions-rs/audit-check@v1 + # with: + # token: ${{ secrets.GITHUB_TOKEN }} - name: License/dependency policy uses: EmbarkStudios/cargo-deny-action@v2 From 78d05882b3a01d12ce35ba05701039af8b5a2586 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:43:36 +0200 Subject: [PATCH 5/6] ci: Remove license/dependency policy step from CI workflow --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86f4812..a309a46 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,8 +65,8 @@ jobs: # with: # token: ${{ secrets.GITHUB_TOKEN }} - - name: License/dependency policy - uses: EmbarkStudios/cargo-deny-action@v2 + # - name: License/dependency policy + # uses: EmbarkStudios/cargo-deny-action@v2 - name: Wait for PostgreSQL run: | From b11bb0e556e6f164d58efb05ca5b85590e008aac Mon Sep 17 00:00:00 2001 From: charlieroth Date: Sun, 24 Aug 2025 11:46:37 +0200 Subject: [PATCH 6/6] ci: disable sccache for now since the depending GitHub doesn't seem to be working right now --- .github/workflows/ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a309a46..d861aa3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,8 +8,8 @@ env: CARGO_TERM_COLOR: always DATABASE_URL: postgres://capsule:capsule_password@localhost:5432/capsule_dev JWT_SECRET: dev-secret-change-me - RUSTC_WRAPPER: sccache - SCCACHE_GHA_ENABLED: "true" + # RUSTC_WRAPPER: sccache + # SCCACHE_GHA_ENABLED: "true" jobs: ci: @@ -41,8 +41,8 @@ jobs: toolchain: stable components: rustfmt, clippy - - name: Setup sccache - uses: mozilla-actions/sccache-action@v0.0.6 + # - name: Setup sccache + # uses: mozilla-actions/sccache-action@v0.0.6 - name: Rust dependency cache uses: Swatinem/rust-cache@v2 @@ -90,5 +90,5 @@ jobs: - name: SQLx prepare check run: cargo sqlx prepare --check --workspace -- --all-features - - name: sccache stats - run: sccache --show-stats || true + # - name: sccache stats + # run: sccache --show-stats || true