diff --git a/.github/workflows/deploy-queue.yml b/.github/workflows/deploy-queue.yml index 9d055e9..f704a10 100644 --- a/.github/workflows/deploy-queue.yml +++ b/.github/workflows/deploy-queue.yml @@ -760,22 +760,22 @@ jobs: mode: finish deployment-id: ${{ steps.start-v0-5-2.outputs.deployment-id }} - # Test v0.6.0 with the updated schema - - name: Start deployment (v0.6.0) - id: start-v0-6-0 - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.0 + # Test v0.7.0 with the updated schema + - name: Start deployment (v0.7.0) + id: start-v0-7-0 + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.7.0 with: mode: start environment: dev cloud-provider: aws - region: compat-v0-6-0 + region: compat-v0-7-0 cell-index: 1 component: compat-test - version: v0.6.0 - note: Compatibility test - v0.6.0 + version: v0.7.0 + note: Compatibility test - v0.7.0 - - name: Finish deployment (v0.6.0) - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.0 + - name: Finish deployment (v0.7.0) + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.7.0 with: mode: finish - deployment-id: ${{ steps.start-v0-6-0.outputs.deployment-id }} + deployment-id: ${{ steps.start-v0-7-0.outputs.deployment-id }} diff --git a/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json b/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json new file mode 100644 index 0000000..2917aa6 --- /dev/null +++ b/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id\n FROM deployments\n WHERE url = $1\n ORDER BY id DESC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1" +} diff --git a/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json b/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json new file mode 100644 index 0000000..d22fb50 --- /dev/null +++ b/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n component,\n version,\n heartbeat_timestamp,\n NOW() - heartbeat_timestamp AS time_since_heartbeat\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "component", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "version", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "heartbeat_timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "time_since_heartbeat", + "type_info": "Interval" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + false, + false, + true, + true, + null + ] + }, + "hash": "a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c" +} diff --git a/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json b/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json new file mode 100644 index 0000000..0b0f25e --- /dev/null +++ b/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE deployments SET heartbeat_timestamp = NOW() WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c" +} diff --git a/deploy-queue/Cargo.lock b/deploy-queue/Cargo.lock index 123287c..a7c4e19 100644 --- a/deploy-queue/Cargo.lock +++ b/deploy-queue/Cargo.lock @@ -342,7 +342,7 @@ dependencies = [ [[package]] name = "deploy-queue" -version = "0.7.2" +version = "0.8.0" dependencies = [ "anyhow", "backon", diff --git a/deploy-queue/Cargo.toml b/deploy-queue/Cargo.toml index 1d36c2d..1112ec9 100644 --- a/deploy-queue/Cargo.toml +++ b/deploy-queue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deploy-queue" -version = "0.7.2" +version = "0.8.0" edition = "2024" [dependencies] diff --git a/deploy-queue/README.md b/deploy-queue/README.md index 0012838..9c766dc 100644 --- a/deploy-queue/README.md +++ b/deploy-queue/README.md @@ -117,6 +117,9 @@ deploy-queue start --environment prod --provider aws --region us-west-2 --cell-i - Writes `deployment-id=` to `$GITHUB_OUTPUT` if running in GitHub Actions - Blocks until all conflicting deployments complete - Starts the deployment automatically when ready +- Runs a background heartbeat loop that: + - Sends periodic heartbeats during the wait + - Cancels deployments with stale heartbeats (if they block current deployment) ### 2. Finish a Deployment @@ -256,6 +259,14 @@ Buffer times are configured in the database: These values are set in the initial migration and can be adjusted in the `environments` table. +### Heartbeats + +The system supports heartbeats to detect stuck deployments: + +- **Background heartbeats during `start`**: While waiting for blocking deployments, a background task updates the deployment's `heartbeat_timestamp`. If it fails to send heartbeats repeatedly, it cancels the deployment with a note. It also cancels other deployments with stale heartbeats that are blocking your deployment. +- **Manual heartbeats**: `deploy-queue heartbeat deployment --deployment-id ` (or `heartbeat url --url `) runs a foreground loop that sends heartbeats until stopped. +- **Stale heartbeat detection**: deployments with a heartbeat older than the configured timeout (currently set to 15 minutes) are cancelled automatically when the heartbeat loop runs. + ## Database Schema The system uses two main tables: diff --git a/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql b/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql new file mode 100644 index 0000000..47596aa --- /dev/null +++ b/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql @@ -0,0 +1,9 @@ +-- Add heartbeat_timestamp column to deployments table +-- This field tracks the last heartbeat timestamp for deployment liveness monitoring +ALTER TABLE + deployments +ADD + COLUMN heartbeat_timestamp TIMESTAMPTZ; + +-- Add column comment +COMMENT ON COLUMN deployments.heartbeat_timestamp IS 'Last heartbeat timestamp for tracking deployment liveness'; diff --git a/deploy-queue/src/cli.rs b/deploy-queue/src/cli.rs index 2f43a02..8b46d70 100644 --- a/deploy-queue/src/cli.rs +++ b/deploy-queue/src/cli.rs @@ -62,6 +62,11 @@ pub enum Mode { #[command(subcommand)] entity: ListEntity, }, + /// Send periodic heartbeats for a deployment (runs until terminated) + Heartbeat { + #[command(subcommand)] + target: HeartbeatTarget, + }, } #[derive(Parser, Clone)] @@ -136,3 +141,17 @@ pub enum ListEntity { environment: Environment, }, } + +#[derive(Subcommand, Clone)] +pub enum HeartbeatTarget { + /// Send heartbeat for a deployment by ID + Deployment { + /// Deployment ID to send heartbeat for + deployment_id: i64, + }, + /// Send heartbeat for a deployment by URL + Url { + /// GitHub Actions URL to find deployment + url: String, + }, +} diff --git a/deploy-queue/src/constants.rs b/deploy-queue/src/constants.rs index 1a1e119..f082dd5 100644 --- a/deploy-queue/src/constants.rs +++ b/deploy-queue/src/constants.rs @@ -4,3 +4,6 @@ pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); pub const ACQUIRE_TIMEOUT: Duration = Duration::from_secs(10); pub const IDLE_TIMEOUT: Duration = Duration::from_secs(10); pub const BUSY_RETRY: Duration = Duration::from_secs(5); +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); +pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15 * 60); // 15 minutes +pub const HEARTBEAT_UPDATE_TIMEOUT: Duration = Duration::from_secs(20); diff --git a/deploy-queue/src/handler/fetch.rs b/deploy-queue/src/handler/fetch.rs index 866ff8d..234edbf 100644 --- a/deploy-queue/src/handler/fetch.rs +++ b/deploy-queue/src/handler/fetch.rs @@ -4,7 +4,7 @@ use time::Duration; use crate::{ cli::Environment, - model::{BlockingDeployment, Cell, Deployment, OutlierDeployment}, + model::{BlockingDeployment, Cell, Deployment, OutlierDeployment, StaleHeartbeatDeployment}, util::duration::DurationExt, }; @@ -52,6 +52,27 @@ pub async fn deployment(client: &Pool, deployment_id: i64) -> Result, url: &str) -> Result> { + let row = sqlx::query!( + r#" + SELECT id + FROM deployments + WHERE url = $1 + ORDER BY id DESC + LIMIT 1 + "#, + url + ) + .fetch_optional(client) + .await?; + + if let Some(row) = row { + Ok(Some(row.id)) + } else { + Ok(None) + } +} + pub async fn blocking_deployments( client: &Pool, deployment_id: i64, @@ -111,6 +132,55 @@ pub async fn blocking_deployments( Ok(blocking_deployments) } +pub async fn stale_heartbeat_deployments( + client: &Pool, + timeout: std::time::Duration, +) -> Result> { + let interval = timeout.to_pg_interval()?; + let rows = sqlx::query!( + r#" + SELECT + id, + component, + version, + heartbeat_timestamp, + NOW() - heartbeat_timestamp AS time_since_heartbeat + FROM deployments + WHERE heartbeat_timestamp IS NOT NULL + AND finish_timestamp IS NULL + AND cancellation_timestamp IS NULL + AND heartbeat_timestamp < NOW() - $1::interval + "#, + interval + ) + .fetch_all(client) + .await?; + + let deployments = rows + .into_iter() + .map(|row| -> Result { + let heartbeat_timestamp = row + .heartbeat_timestamp + .context("heartbeat_timestamp should not be NULL")?; + let time_since_heartbeat = row + .time_since_heartbeat + .context("time_since_heartbeat should not be NULL")? + .to_duration() + .context("Failed to convert time_since_heartbeat")?; + + Ok(StaleHeartbeatDeployment { + id: row.id, + component: row.component, + version: row.version, + heartbeat_timestamp, + time_since_heartbeat, + }) + }) + .collect::>>()?; + + Ok(deployments) +} + pub async fn outlier_deployments(client: &Pool) -> Result> { let rows = sqlx::query_file!("queries/active_outliers.sql") .fetch_all(client) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 1d0aef2..1b27546 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -3,12 +3,13 @@ pub mod fetch; pub mod list; use anyhow::Result; -use log::info; +use log::{info, warn}; use sqlx::{Pool, Postgres}; use time::Duration; +use tokio::task::JoinHandle; use crate::{ - constants::BUSY_RETRY, + constants::{BUSY_RETRY, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, HEARTBEAT_UPDATE_TIMEOUT}, model::Deployment, util::{duration::DurationExt, github}, }; @@ -35,11 +36,42 @@ pub async fn enqueue_deployment(client: &Pool, deployment: Deployment) Ok(deployment_id) } +/// Cancel deployments with stale heartbeats +async fn cancel_stale_heartbeat_deployments( + client: &Pool, + canceller_deployment_id: i64, +) -> Result<()> { + let stale_deployments = fetch::stale_heartbeat_deployments(client, HEARTBEAT_TIMEOUT).await?; + + let cancellation_note = format!( + "Cancelled by deployment {} due to stale heartbeat", + canceller_deployment_id + ); + + for deployment in stale_deployments { + log::warn!( + "Cancelling deployment {} ({}, version={}) due to stale heartbeat: last seen {} ago at {}", + deployment.id, + deployment.component, + deployment.version.as_deref().unwrap_or("unknown"), + deployment.time_since_heartbeat.format_human(), + deployment.heartbeat_timestamp, + ); + + cancel::deployment(client, deployment.id, Some(cancellation_note.as_str())).await?; + } + + Ok(()) +} + pub async fn wait_for_blocking_deployments( pg_pool: &Pool, deployment_id: i64, ) -> Result<()> { loop { + // Check for and cancel any deployments with stale heartbeats + cancel_stale_heartbeat_deployments(pg_pool, deployment_id).await?; + let blocking_deployments = fetch::blocking_deployments(pg_pool, deployment_id).await?; if blocking_deployments.is_empty() { @@ -142,3 +174,101 @@ pub async fn finish_deployment(client: &Pool, deployment_id: i64) -> R log::info!("Deployment {} has been finished", deployment_id); Ok(()) } + +/// Update the heartbeat timestamp for a deployment +/// This is the core function that can be called from anywhere (e.g., as a background task) +pub async fn update_heartbeat(client: &Pool, deployment_id: i64) -> Result<()> { + sqlx::query!( + "UPDATE deployments SET heartbeat_timestamp = NOW() WHERE id = $1", + deployment_id + ) + .execute(client) + .await?; + log::debug!("Heartbeat sent for deployment {}", deployment_id); + Ok(()) +} + +/// Run heartbeat in a loop with periodic intervals until terminated +pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> Result<()> { + info!( + "Starting heartbeat loop for deployment {} (interval: {}s)", + deployment_id, + HEARTBEAT_INTERVAL.as_secs() + ); + + const HEARTBEAT_MAX_CONSECUTIVE_FAILURES: u32 = 5; + + let mut consecutive_failures: u32 = 0; + let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + if consecutive_failures == 0 { + interval.tick().await; + } else { + tokio::time::sleep(std::time::Duration::from_secs( + 2 * consecutive_failures as u64, + )) + .await; + } + + let result = tokio::time::timeout( + HEARTBEAT_UPDATE_TIMEOUT, + update_heartbeat(client, deployment_id), + ) + .await; + + if let Ok(Ok(())) = result { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + let reason = match result { + Ok(Err(err)) => err.to_string(), + Err(_) => format!("timed out after {:?}", HEARTBEAT_UPDATE_TIMEOUT), + _ => "unknown error".to_string(), + }; + warn!( + "Failed to send heartbeat for deployment {} (attempt {}/{}): {}", + deployment_id, consecutive_failures, HEARTBEAT_MAX_CONSECUTIVE_FAILURES, reason + ); + } + + if consecutive_failures >= HEARTBEAT_MAX_CONSECUTIVE_FAILURES { + anyhow::bail!( + "Heartbeat loop failed {} times consecutively for deployment {}", + consecutive_failures, + deployment_id + ); + } + } +} + +/// Start a background heartbeat loop; returns a JoinHandle so caller can abort it +pub fn start_heartbeat_background(client: &Pool, deployment_id: i64) -> JoinHandle<()> { + let heartbeat_client = client.clone(); + tokio::spawn(async move { + if let Err(err) = run_heartbeat_loop(&heartbeat_client, deployment_id).await { + warn!( + "Heartbeat loop exited for deployment {}: {}", + deployment_id, err + ); + + // If the heartbeat loop stops due to repeated failures, cancel the deployment + if let Err(cancel_err) = cancel::deployment( + &heartbeat_client, + deployment_id, + Some(format!( + "Deployment {} cancelled by heartbeat loop after repeated heartbeat failures", + deployment_id, + )), + ) + .await + { + warn!( + "Failed to cancel deployment {} after heartbeat loop exit: {}", + deployment_id, cancel_err + ); + } + } + }) +} diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index 18ddc9d..4ddfd05 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -27,6 +27,8 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< .await .context("Faild to enqueue deployment")?; + handler::start_heartbeat_background(&db_client, deployment_id); + // Wait for all blocking deployments to finish handler::wait_for_blocking_deployments(&db_client, deployment_id) .await @@ -98,6 +100,26 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< .await .context("Failed to list cells")?; } + cli::Mode::Heartbeat { target } => match target { + cli::HeartbeatTarget::Deployment { deployment_id } => { + handler::run_heartbeat_loop(&db_client, deployment_id) + .await + .with_context(|| { + format!("Failed to run heartbeat loop for deployment {deployment_id}") + })?; + } + cli::HeartbeatTarget::Url { url } => { + let deployment_id = handler::fetch::deployment_id_by_url(&db_client, &url) + .await? + .with_context(|| format!("No deployment found with URL: {}", url))?; + + handler::run_heartbeat_loop(&db_client, deployment_id) + .await + .with_context(|| { + format!("Failed to run heartbeat loop for deployment {deployment_id}") + })?; + } + }, } Ok(()) diff --git a/deploy-queue/src/model.rs b/deploy-queue/src/model.rs index 1d9769d..b3f7c7c 100644 --- a/deploy-queue/src/model.rs +++ b/deploy-queue/src/model.rs @@ -30,6 +30,15 @@ pub struct Deployment { pub buffer_time: Duration, } +/// Minimal view of a deployment for stale-heartbeat checks +pub struct StaleHeartbeatDeployment { + pub id: i64, + pub component: String, + pub version: Option, + pub heartbeat_timestamp: OffsetDateTime, + pub time_since_heartbeat: Duration, +} + impl Deployment { /// Generate a compact summary of this deployment's information pub fn summary(&self) -> String { diff --git a/deploy-queue/tests/heartbeat_tests.rs b/deploy-queue/tests/heartbeat_tests.rs new file mode 100644 index 0000000..839be04 --- /dev/null +++ b/deploy-queue/tests/heartbeat_tests.rs @@ -0,0 +1,126 @@ +use anyhow::Result; +use deploy_queue::{constants::HEARTBEAT_TIMEOUT, handler}; +use time::{Duration as TimeDuration, OffsetDateTime}; + +#[path = "common/test_db_setup.rs"] +mod database_helpers; + +#[path = "fixtures/deployment.rs"] +mod deployment_fixtures; + +extern crate deploy_queue; + +#[tokio::test] +async fn heartbeat_loop_sets_timestamp() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + let deployment_id = deployment_fixtures::create_test_deployment(&pool).await?; + + // Run the heartbeat and wait a few milliseconds (so it can write the timestamp) + let heartbeat_pool = pool.clone(); + let handle = tokio::spawn(async move { + handler::run_heartbeat_loop(&heartbeat_pool, deployment_id) + .await + .ok(); + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Check that the heartbeat timestamp was set + let (heartbeat_timestamp,): (Option,) = + sqlx::query_as("SELECT heartbeat_timestamp FROM deployments WHERE id = $1") + .bind(deployment_id) + .fetch_one(&pool) + .await?; + + assert!( + heartbeat_timestamp.is_some(), + "Heartbeat loop should set heartbeat_timestamp" + ); + + // Stop the heartbeat loop + handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn stale_heartbeat_detection_flags_old_running_deployments() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + let deployment_id = deployment_fixtures::create_running_deployment(&pool).await?; + + // Set heartbeat older than the timeout + let stale_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 + 60); + sqlx::query("UPDATE deployments SET heartbeat_timestamp = $1 WHERE id = $2") + .bind(stale_at) + .bind(deployment_id) + .execute(&pool) + .await?; + + // Should be returned as stale + let stale = handler::fetch::stale_heartbeat_deployments(&pool, HEARTBEAT_TIMEOUT).await?; + assert!( + stale.iter().any(|d| d.id == deployment_id), + "Deployment with stale heartbeat should be flagged" + ); + + // Make the heartbeat fresh and ensure it is no longer reported + let fresh_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 - 60); + sqlx::query("UPDATE deployments SET heartbeat_timestamp = $1 WHERE id = $2") + .bind(fresh_at) + .bind(deployment_id) + .execute(&pool) + .await?; + + let stale_again = handler::fetch::stale_heartbeat_deployments(&pool, HEARTBEAT_TIMEOUT).await?; + assert!( + !stale_again.iter().any(|d| d.id == deployment_id), + "Deployment with fresh heartbeat should not be flagged" + ); + + Ok(()) +} + +#[tokio::test] +async fn stale_blocker_gets_cancelled_when_waiting_for_blockers() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + + // Create a running deployment with a stale heartbeat that will block others + let blocking = deployment_fixtures::create_running_deployment(&pool).await?; + let stale_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 + 60); + sqlx::query( + "UPDATE deployments + SET heartbeat_timestamp = $1 + WHERE id = $2", + ) + .bind(stale_at) + .bind(blocking) + .execute(&pool) + .await?; + + // Create a new deployment and check for blocking deployments + let waiter = deployment_fixtures::create_test_deployment(&pool).await?; + handler::wait_for_blocking_deployments(&pool, waiter).await?; + + // Verify the blocking deployment was cancelled with the expected note + let (cancellation_timestamp, cancellation_note): (Option, Option) = + sqlx::query_as( + "SELECT cancellation_timestamp, cancellation_note FROM deployments WHERE id = $1", + ) + .bind(blocking) + .fetch_one(&pool) + .await?; + + assert!( + cancellation_timestamp.is_some(), + "Blocking deployment should be cancelled" + ); + let note = cancellation_note.expect("cancellation_note should be set"); + assert!( + note.contains(&format!("Cancelled by deployment {}", waiter)), + "Cancellation note should mention the cancelling deployment id; got {note}" + ); + + Ok(()) +}