From eb3c09ab08ff82c1cd94fd820526e89dfbcf2e62 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 22 Dec 2025 13:55:01 +0100 Subject: [PATCH 01/25] feat(deploy-queue): add heartbeat to deployments table --- deploy-queue/migrations/0008_add_heartbeat_timestamp.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 deploy-queue/migrations/0008_add_heartbeat_timestamp.sql diff --git a/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql b/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql new file mode 100644 index 0000000..47596aa --- /dev/null +++ b/deploy-queue/migrations/0008_add_heartbeat_timestamp.sql @@ -0,0 +1,9 @@ +-- Add heartbeat_timestamp column to deployments table +-- This field tracks the last heartbeat timestamp for deployment liveness monitoring +ALTER TABLE + deployments +ADD + COLUMN heartbeat_timestamp TIMESTAMPTZ; + +-- Add column comment +COMMENT ON COLUMN deployments.heartbeat_timestamp IS 'Last heartbeat timestamp for tracking deployment liveness'; From ca41c99da270f72c0b1f0255dc2822adfc68a29e Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 22 Dec 2025 18:57:12 +0100 Subject: [PATCH 02/25] feat: add heartbeat command and heartbeat_timestamp field --- ...4234bc09ccc780ed4f84e243cb4e0b5de48f1.json | 22 ++++++++++++++ ...058b39ea16843d0bf78c0fed9ae6c2e155e3c.json | 14 +++++++++ deploy-queue/src/cli.rs | 19 ++++++++++++ deploy-queue/src/constants.rs | 1 + deploy-queue/src/handler/fetch.rs | 21 ++++++++++++++ deploy-queue/src/handler/mod.rs | 29 ++++++++++++++++++- deploy-queue/src/lib.rs | 20 +++++++++++++ 7 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json create mode 100644 deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json diff --git a/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json b/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json new file mode 100644 index 0000000..2917aa6 --- /dev/null +++ b/deploy-queue/.sqlx/query-2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id\n FROM deployments\n WHERE url = $1\n ORDER BY id DESC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2bf963a7c12d560c76069ddb0084234bc09ccc780ed4f84e243cb4e0b5de48f1" +} diff --git a/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json b/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json new file mode 100644 index 0000000..0b0f25e --- /dev/null +++ b/deploy-queue/.sqlx/query-dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE deployments SET heartbeat_timestamp = NOW() WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "dbb47ac13317873add310d9ace6058b39ea16843d0bf78c0fed9ae6c2e155e3c" +} diff --git a/deploy-queue/src/cli.rs b/deploy-queue/src/cli.rs index 2f43a02..8b46d70 100644 --- a/deploy-queue/src/cli.rs +++ b/deploy-queue/src/cli.rs @@ -62,6 +62,11 @@ pub enum Mode { #[command(subcommand)] entity: ListEntity, }, + /// Send periodic heartbeats for a deployment (runs until terminated) + Heartbeat { + #[command(subcommand)] + target: HeartbeatTarget, + }, } #[derive(Parser, Clone)] @@ -136,3 +141,17 @@ pub enum ListEntity { environment: Environment, }, } + +#[derive(Subcommand, Clone)] +pub enum HeartbeatTarget { + /// Send heartbeat for a deployment by ID + Deployment { + /// Deployment ID to send heartbeat for + deployment_id: i64, + }, + /// Send heartbeat for a deployment by URL + Url { + /// GitHub Actions URL to find deployment + url: String, + }, +} diff --git a/deploy-queue/src/constants.rs b/deploy-queue/src/constants.rs index 1a1e119..4aaaed5 100644 --- a/deploy-queue/src/constants.rs +++ b/deploy-queue/src/constants.rs @@ -4,3 +4,4 @@ pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); pub const ACQUIRE_TIMEOUT: Duration = Duration::from_secs(10); pub const IDLE_TIMEOUT: Duration = Duration::from_secs(10); pub const BUSY_RETRY: Duration = Duration::from_secs(5); +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); diff --git a/deploy-queue/src/handler/fetch.rs b/deploy-queue/src/handler/fetch.rs index 866ff8d..521aee4 100644 --- a/deploy-queue/src/handler/fetch.rs +++ b/deploy-queue/src/handler/fetch.rs @@ -52,6 +52,27 @@ pub async fn deployment(client: &Pool, deployment_id: i64) -> Result, url: &str) -> Result> { + let row = sqlx::query!( + r#" + SELECT id + FROM deployments + WHERE url = $1 + ORDER BY id DESC + LIMIT 1 + "#, + url + ) + .fetch_optional(client) + .await?; + + if let Some(row) = row { + Ok(Some(row.id)) + } else { + Ok(None) + } +} + pub async fn blocking_deployments( client: &Pool, deployment_id: i64, diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 1d0aef2..0d59445 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -8,7 +8,7 @@ use sqlx::{Pool, Postgres}; use time::Duration; use crate::{ - constants::BUSY_RETRY, + constants::{BUSY_RETRY, HEARTBEAT_INTERVAL}, model::Deployment, util::{duration::DurationExt, github}, }; @@ -142,3 +142,30 @@ pub async fn finish_deployment(client: &Pool, deployment_id: i64) -> R log::info!("Deployment {} has been finished", deployment_id); Ok(()) } + +/// Update the heartbeat timestamp for a deployment +/// This is the core function that can be called from anywhere (e.g., as a background task) +pub async fn update_heartbeat(client: &Pool, deployment_id: i64) -> Result<()> { + sqlx::query!( + "UPDATE deployments SET heartbeat_timestamp = NOW() WHERE id = $1", + deployment_id + ) + .execute(client) + .await?; + log::debug!("Heartbeat sent for deployment {}", deployment_id); + Ok(()) +} + +/// Run heartbeat in a loop with periodic intervals until terminated +pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> Result<()> { + info!( + "Starting heartbeat loop for deployment {} (interval: {}s)", + deployment_id, + HEARTBEAT_INTERVAL.as_secs() + ); + + loop { + update_heartbeat(client, deployment_id).await?; + tokio::time::sleep(HEARTBEAT_INTERVAL).await; + } +} diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index 18ddc9d..c1447d2 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -98,6 +98,26 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< .await .context("Failed to list cells")?; } + cli::Mode::Heartbeat { target } => match target { + cli::HeartbeatTarget::Deployment { deployment_id } => { + handler::run_heartbeat_loop(&db_client, deployment_id) + .await + .with_context(|| { + format!("Failed to run heartbeat loop for deployment {deployment_id}") + })?; + } + cli::HeartbeatTarget::Url { url } => { + let deployment_id = handler::fetch::deployment_id_by_url(&db_client, &url) + .await? + .ok_or_else(|| anyhow::anyhow!("No deployment found with URL: {}", url))?; + + handler::run_heartbeat_loop(&db_client, deployment_id) + .await + .with_context(|| { + format!("Failed to run heartbeat loop for deployment {deployment_id}") + })?; + } + }, } Ok(()) From 300a0cc10700aa1170ca5b0230d2b748b7e8c146 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 22 Dec 2025 20:23:39 +0100 Subject: [PATCH 03/25] feat: auto-cancel deployments with stale heartbeats --- ...71b960b0983671dfc584ba3706e3df3ba87a7.json | 32 ++++++++++++++++ deploy-queue/src/constants.rs | 1 + deploy-queue/src/handler/mod.rs | 37 +++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json diff --git a/deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json b/deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json new file mode 100644 index 0000000..e640d83 --- /dev/null +++ b/deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id, component, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - INTERVAL '5 minutes'\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "component", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "heartbeat_timestamp", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + true + ] + }, + "hash": "c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7" +} diff --git a/deploy-queue/src/constants.rs b/deploy-queue/src/constants.rs index 4aaaed5..df1620d 100644 --- a/deploy-queue/src/constants.rs +++ b/deploy-queue/src/constants.rs @@ -5,3 +5,4 @@ pub const ACQUIRE_TIMEOUT: Duration = Duration::from_secs(10); pub const IDLE_TIMEOUT: Duration = Duration::from_secs(10); pub const BUSY_RETRY: Duration = Duration::from_secs(5); pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); +pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5 * 60); // 5 minutes diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 0d59445..e42f91c 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -35,11 +35,48 @@ pub async fn enqueue_deployment(client: &Pool, deployment: Deployment) Ok(deployment_id) } +/// Cancel deployments with stale heartbeats +async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<()> { + let stale_deployments = sqlx::query!( + r#" + SELECT id, component, heartbeat_timestamp + FROM deployments + WHERE heartbeat_timestamp IS NOT NULL + AND finish_timestamp IS NULL + AND cancellation_timestamp IS NULL + AND heartbeat_timestamp < NOW() - INTERVAL '5 minutes' + "# + ) + .fetch_all(client) + .await?; + + for deployment in stale_deployments { + log::warn!( + "Cancelling deployment {} ({}) due to stale heartbeat (last seen: {:?})", + deployment.id, + deployment.component, + deployment.heartbeat_timestamp + ); + + cancel::deployment( + client, + deployment.id, + Some("Cancelled due to stale heartbeat - deployment appears to be dead"), + ) + .await?; + } + + Ok(()) +} + pub async fn wait_for_blocking_deployments( pg_pool: &Pool, deployment_id: i64, ) -> Result<()> { loop { + // Check for and cancel any deployments with stale heartbeats + cancel_stale_heartbeat_deployments(pg_pool).await?; + let blocking_deployments = fetch::blocking_deployments(pg_pool, deployment_id).await?; if blocking_deployments.is_empty() { From c83e2554d82cc95fbd27ff0732fa7bca31f803f1 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 22 Dec 2025 22:17:03 +0100 Subject: [PATCH 04/25] feat(deploy-queue): Use HEARTBEAT_TIMEOUT constant in stale heartbeat query. --- ...8ed027c5a8236256921126994db1bd978c6ba7c10f79.json} | 8 +++++--- deploy-queue/src/handler/mod.rs | 11 +++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) rename deploy-queue/.sqlx/{query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json => query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json} (82%) diff --git a/deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json b/deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json similarity index 82% rename from deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json rename to deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json index e640d83..d4a309b 100644 --- a/deploy-queue/.sqlx/query-c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7.json +++ b/deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT id, component, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - INTERVAL '5 minutes'\n ", + "query": "\n SELECT id, component, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", "describe": { "columns": [ { @@ -20,7 +20,9 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Interval" + ] }, "nullable": [ false, @@ -28,5 +30,5 @@ true ] }, - "hash": "c2e88a48298f910725f2351066571b960b0983671dfc584ba3706e3df3ba87a7" + "hash": "2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79" } diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index e42f91c..0281bb5 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -8,7 +8,7 @@ use sqlx::{Pool, Postgres}; use time::Duration; use crate::{ - constants::{BUSY_RETRY, HEARTBEAT_INTERVAL}, + constants::{BUSY_RETRY, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}, model::Deployment, util::{duration::DurationExt, github}, }; @@ -37,6 +37,8 @@ pub async fn enqueue_deployment(client: &Pool, deployment: Deployment) /// Cancel deployments with stale heartbeats async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<()> { + let heartbeat_timeout_interval = HEARTBEAT_TIMEOUT.to_pg_interval()?; + let stale_deployments = sqlx::query!( r#" SELECT id, component, heartbeat_timestamp @@ -44,8 +46,9 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( WHERE heartbeat_timestamp IS NOT NULL AND finish_timestamp IS NULL AND cancellation_timestamp IS NULL - AND heartbeat_timestamp < NOW() - INTERVAL '5 minutes' - "# + AND heartbeat_timestamp < NOW() - $1::interval + "#, + heartbeat_timeout_interval ) .fetch_all(client) .await?; @@ -61,7 +64,7 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( cancel::deployment( client, deployment.id, - Some("Cancelled due to stale heartbeat - deployment appears to be dead"), + Some("Cancelled due to stale heartbeat"), ) .await?; } From 616f8d599a037f9c19860a49c501ad5f2ddb8cd6 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 22 Dec 2025 22:25:42 +0100 Subject: [PATCH 05/25] feat(deploy-queue): When cancelling stale deployments, print component + version as well. --- ...1fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json} | 10 ++++++++-- deploy-queue/src/handler/mod.rs | 5 +++-- 2 files changed, 11 insertions(+), 4 deletions(-) rename deploy-queue/.sqlx/{query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json => query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json} (50%) diff --git a/deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json b/deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json similarity index 50% rename from deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json rename to deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json index d4a309b..e93fd60 100644 --- a/deploy-queue/.sqlx/query-2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79.json +++ b/deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT id, component, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", + "query": "\n SELECT id, component, version, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", "describe": { "columns": [ { @@ -15,6 +15,11 @@ }, { "ordinal": 2, + "name": "version", + "type_info": "Varchar" + }, + { + "ordinal": 3, "name": "heartbeat_timestamp", "type_info": "Timestamptz" } @@ -27,8 +32,9 @@ "nullable": [ false, false, + true, true ] }, - "hash": "2c5dbde473464c6106558ed027c5a8236256921126994db1bd978c6ba7c10f79" + "hash": "b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22" } diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 0281bb5..170de4f 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -41,7 +41,7 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( let stale_deployments = sqlx::query!( r#" - SELECT id, component, heartbeat_timestamp + SELECT id, component, version, heartbeat_timestamp FROM deployments WHERE heartbeat_timestamp IS NOT NULL AND finish_timestamp IS NULL @@ -55,9 +55,10 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( for deployment in stale_deployments { log::warn!( - "Cancelling deployment {} ({}) due to stale heartbeat (last seen: {:?})", + "Cancelling deployment {} ({}, version={}) due to stale heartbeat (last seen: {:?})", deployment.id, deployment.component, + deployment.version.as_deref().unwrap_or("unknown"), deployment.heartbeat_timestamp ); From e973cc471447fa23781f70a07fb8bf437fb3cd50 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 30 Dec 2025 14:26:07 +0100 Subject: [PATCH 06/25] feat(deploy-queue): Moved staleheartbeat query in a separate function --- ...cd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json | 46 ++++++++++++++++++ ...d957e5d63ca0b6f25707bc6e0e275d6856d22.json | 40 ---------------- deploy-queue/src/handler/fetch.rs | 48 ++++++++++++++++++- deploy-queue/src/handler/mod.rs | 40 +++++++++------- deploy-queue/src/lib.rs | 7 +++ deploy-queue/src/model.rs | 9 ++++ 6 files changed, 131 insertions(+), 59 deletions(-) create mode 100644 deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json delete mode 100644 deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json diff --git a/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json b/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json new file mode 100644 index 0000000..d22fb50 --- /dev/null +++ b/deploy-queue/.sqlx/query-a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n component,\n version,\n heartbeat_timestamp,\n NOW() - heartbeat_timestamp AS time_since_heartbeat\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "component", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "version", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "heartbeat_timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "time_since_heartbeat", + "type_info": "Interval" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + false, + false, + true, + true, + null + ] + }, + "hash": "a80eb87e4f10ac993de619ec69fcd2a78a3e36eec40e4ad6d8e31e836eb0f64c" +} diff --git a/deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json b/deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json deleted file mode 100644 index e93fd60..0000000 --- a/deploy-queue/.sqlx/query-b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT id, component, version, heartbeat_timestamp\n FROM deployments\n WHERE heartbeat_timestamp IS NOT NULL\n AND finish_timestamp IS NULL\n AND cancellation_timestamp IS NULL\n AND heartbeat_timestamp < NOW() - $1::interval\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "component", - "type_info": "Varchar" - }, - { - "ordinal": 2, - "name": "version", - "type_info": "Varchar" - }, - { - "ordinal": 3, - "name": "heartbeat_timestamp", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [ - false, - false, - true, - true - ] - }, - "hash": "b20a65fa55f426d50571fa2b637d957e5d63ca0b6f25707bc6e0e275d6856d22" -} diff --git a/deploy-queue/src/handler/fetch.rs b/deploy-queue/src/handler/fetch.rs index 521aee4..f8215c4 100644 --- a/deploy-queue/src/handler/fetch.rs +++ b/deploy-queue/src/handler/fetch.rs @@ -4,7 +4,7 @@ use time::Duration; use crate::{ cli::Environment, - model::{BlockingDeployment, Cell, Deployment, OutlierDeployment}, + model::{BlockingDeployment, Cell, Deployment, OutlierDeployment, StaleHeartbeatDeployment}, util::duration::DurationExt, }; @@ -132,6 +132,52 @@ pub async fn blocking_deployments( Ok(blocking_deployments) } +pub async fn stale_heartbeat_deployments( + client: &Pool, + timeout: std::time::Duration, +) -> Result> { + let interval = timeout.to_pg_interval()?; + let rows = sqlx::query!( + r#" + SELECT + id, + component, + version, + heartbeat_timestamp, + NOW() - heartbeat_timestamp AS time_since_heartbeat + FROM deployments + WHERE heartbeat_timestamp IS NOT NULL + AND finish_timestamp IS NULL + AND cancellation_timestamp IS NULL + AND heartbeat_timestamp < NOW() - $1::interval + "#, + interval + ) + .fetch_all(client) + .await?; + + let deployments = rows + .into_iter() + .map(|row| -> Result { + let time_since_heartbeat = row + .time_since_heartbeat + .context("time_since_heartbeat should not be NULL")? + .to_duration() + .context("Failed to convert time_since_heartbeat")?; + + Ok(StaleHeartbeatDeployment { + id: row.id, + component: row.component, + version: row.version, + heartbeat_timestamp: row.heartbeat_timestamp, + time_since_heartbeat, + }) + }) + .collect::>>()?; + + Ok(deployments) +} + pub async fn outlier_deployments(client: &Pool) -> Result> { let rows = sqlx::query_file!("queries/active_outliers.sql") .fetch_all(client) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 170de4f..4756151 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -3,9 +3,10 @@ pub mod fetch; pub mod list; use anyhow::Result; -use log::info; +use log::{info, warn}; use sqlx::{Pool, Postgres}; use time::Duration; +use tokio::task::JoinHandle; use crate::{ constants::{BUSY_RETRY, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}, @@ -37,29 +38,19 @@ pub async fn enqueue_deployment(client: &Pool, deployment: Deployment) /// Cancel deployments with stale heartbeats async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<()> { - let heartbeat_timeout_interval = HEARTBEAT_TIMEOUT.to_pg_interval()?; - - let stale_deployments = sqlx::query!( - r#" - SELECT id, component, version, heartbeat_timestamp - FROM deployments - WHERE heartbeat_timestamp IS NOT NULL - AND finish_timestamp IS NULL - AND cancellation_timestamp IS NULL - AND heartbeat_timestamp < NOW() - $1::interval - "#, - heartbeat_timeout_interval - ) - .fetch_all(client) - .await?; + let stale_deployments = fetch::stale_heartbeat_deployments(client, HEARTBEAT_TIMEOUT).await?; for deployment in stale_deployments { log::warn!( - "Cancelling deployment {} ({}, version={}) due to stale heartbeat (last seen: {:?})", + "Cancelling deployment {} ({}, version={}) due to stale heartbeat: last seen {} ago at {}", deployment.id, deployment.component, deployment.version.as_deref().unwrap_or("unknown"), - deployment.heartbeat_timestamp + deployment.time_since_heartbeat.format_human(), + deployment + .heartbeat_timestamp + .map(|ts| ts.to_string()) + .unwrap_or_else(|| "unknown".to_string()) ); cancel::deployment( @@ -210,3 +201,16 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> tokio::time::sleep(HEARTBEAT_INTERVAL).await; } } + +/// Start a background heartbeat loop; returns a JoinHandle so caller can abort it +pub fn start_heartbeat_background(client: &Pool, deployment_id: i64) -> JoinHandle<()> { + let heartbeat_client = client.clone(); + tokio::spawn(async move { + if let Err(err) = run_heartbeat_loop(&heartbeat_client, deployment_id).await { + warn!( + "Heartbeat loop exited for deployment {}: {}", + deployment_id, err + ); + } + }) +} diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index c1447d2..3605361 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -27,6 +27,9 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< .await .context("Faild to enqueue deployment")?; + // Start heartbeat loop in the background so we can abort it after starting + let heartbeat_handle = handler::start_heartbeat_background(&db_client, deployment_id); + // Wait for all blocking deployments to finish handler::wait_for_blocking_deployments(&db_client, deployment_id) .await @@ -36,6 +39,10 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< handler::start_deployment(&db_client, deployment_id) .await .with_context(|| format!("Failed to start deployment {deployment_id}"))?; + + // Stop the heartbeat loop now that the deployment has started + heartbeat_handle.abort(); + let _ = heartbeat_handle.await; } cli::Mode::Finish { deployment_id } => { handler::finish_deployment(&db_client, deployment_id) diff --git a/deploy-queue/src/model.rs b/deploy-queue/src/model.rs index 1d9769d..4717fd2 100644 --- a/deploy-queue/src/model.rs +++ b/deploy-queue/src/model.rs @@ -30,6 +30,15 @@ pub struct Deployment { pub buffer_time: Duration, } +/// Minimal view of a deployment for stale-heartbeat checks +pub struct StaleHeartbeatDeployment { + pub id: i64, + pub component: String, + pub version: Option, + pub heartbeat_timestamp: Option, + pub time_since_heartbeat: Duration, +} + impl Deployment { /// Generate a compact summary of this deployment's information pub fn summary(&self) -> String { From c6198add4f23bbb0786aa608500f1c1bdc21ca5a Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 5 Jan 2026 10:45:54 +0100 Subject: [PATCH 07/25] feat(deploy-queue): Increase heartbeat timeout to 15 minutes --- deploy-queue/src/constants.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy-queue/src/constants.rs b/deploy-queue/src/constants.rs index df1620d..d27948f 100644 --- a/deploy-queue/src/constants.rs +++ b/deploy-queue/src/constants.rs @@ -5,4 +5,4 @@ pub const ACQUIRE_TIMEOUT: Duration = Duration::from_secs(10); pub const IDLE_TIMEOUT: Duration = Duration::from_secs(10); pub const BUSY_RETRY: Duration = Duration::from_secs(5); pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); -pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5 * 60); // 5 minutes +pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15 * 60); // 15 minutes From a80fa8512f2fc8dd9d4cc714a0c1ff6c2edd67c9 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 6 Jan 2026 10:12:37 +0100 Subject: [PATCH 08/25] feat(deploy-queue): Add retry to heartbeat update --- deploy-queue/src/constants.rs | 1 + deploy-queue/src/handler/mod.rs | 40 ++++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/deploy-queue/src/constants.rs b/deploy-queue/src/constants.rs index d27948f..f082dd5 100644 --- a/deploy-queue/src/constants.rs +++ b/deploy-queue/src/constants.rs @@ -6,3 +6,4 @@ pub const IDLE_TIMEOUT: Duration = Duration::from_secs(10); pub const BUSY_RETRY: Duration = Duration::from_secs(5); pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15 * 60); // 15 minutes +pub const HEARTBEAT_UPDATE_TIMEOUT: Duration = Duration::from_secs(20); diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 4756151..ea85098 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -9,7 +9,7 @@ use time::Duration; use tokio::task::JoinHandle; use crate::{ - constants::{BUSY_RETRY, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT}, + constants::{BUSY_RETRY, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, HEARTBEAT_UPDATE_TIMEOUT}, model::Deployment, util::{duration::DurationExt, github}, }; @@ -196,9 +196,43 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> HEARTBEAT_INTERVAL.as_secs() ); + const HEARTBEAT_MAX_CONSECUTIVE_FAILURES: u32 = 3; + + let mut consecutive_failures: u32 = 0; + let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - update_heartbeat(client, deployment_id).await?; - tokio::time::sleep(HEARTBEAT_INTERVAL).await; + interval.tick().await; + + let result = tokio::time::timeout( + HEARTBEAT_UPDATE_TIMEOUT, + update_heartbeat(client, deployment_id), + ) + .await; + + if let Ok(Ok(())) = result { + consecutive_failures = 0; + } else { + consecutive_failures += 1; + let reason = match result { + Ok(Err(err)) => err.to_string(), + Err(_) => format!("timed out after {:?}", HEARTBEAT_UPDATE_TIMEOUT), + _ => "unknown error".to_string(), + }; + warn!( + "Failed to send heartbeat for deployment {} (attempt {}/{}): {}", + deployment_id, consecutive_failures, HEARTBEAT_MAX_CONSECUTIVE_FAILURES, reason + ); + } + + if consecutive_failures >= HEARTBEAT_MAX_CONSECUTIVE_FAILURES { + anyhow::bail!( + "Heartbeat loop failed {} times consecutively for deployment {}", + consecutive_failures, + deployment_id + ); + } } } From 62ff26b3bfe34eea2998ee363f4f3ff7c5e0c481 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 12 Jan 2026 21:48:29 +0100 Subject: [PATCH 09/25] feat(deploy-queue): StaleHearbeatDeployment timestamp is mandatory --- deploy-queue/src/handler/fetch.rs | 5 ++++- deploy-queue/src/handler/mod.rs | 5 +---- deploy-queue/src/model.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/deploy-queue/src/handler/fetch.rs b/deploy-queue/src/handler/fetch.rs index f8215c4..234edbf 100644 --- a/deploy-queue/src/handler/fetch.rs +++ b/deploy-queue/src/handler/fetch.rs @@ -159,6 +159,9 @@ pub async fn stale_heartbeat_deployments( let deployments = rows .into_iter() .map(|row| -> Result { + let heartbeat_timestamp = row + .heartbeat_timestamp + .context("heartbeat_timestamp should not be NULL")?; let time_since_heartbeat = row .time_since_heartbeat .context("time_since_heartbeat should not be NULL")? @@ -169,7 +172,7 @@ pub async fn stale_heartbeat_deployments( id: row.id, component: row.component, version: row.version, - heartbeat_timestamp: row.heartbeat_timestamp, + heartbeat_timestamp, time_since_heartbeat, }) }) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index ea85098..81fda16 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -47,10 +47,7 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( deployment.component, deployment.version.as_deref().unwrap_or("unknown"), deployment.time_since_heartbeat.format_human(), - deployment - .heartbeat_timestamp - .map(|ts| ts.to_string()) - .unwrap_or_else(|| "unknown".to_string()) + deployment.heartbeat_timestamp.to_string(), ); cancel::deployment( diff --git a/deploy-queue/src/model.rs b/deploy-queue/src/model.rs index 4717fd2..b3f7c7c 100644 --- a/deploy-queue/src/model.rs +++ b/deploy-queue/src/model.rs @@ -35,7 +35,7 @@ pub struct StaleHeartbeatDeployment { pub id: i64, pub component: String, pub version: Option, - pub heartbeat_timestamp: Option, + pub heartbeat_timestamp: OffsetDateTime, pub time_since_heartbeat: Duration, } From 12b77b752797fd305c4facb6a5474b52c42fa165 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 12 Jan 2026 21:53:50 +0100 Subject: [PATCH 10/25] feat(deploy-queue): Added deployment_id that triggers stale heartbeat cancellation --- deploy-queue/src/handler/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 81fda16..a4830d0 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -37,9 +37,17 @@ pub async fn enqueue_deployment(client: &Pool, deployment: Deployment) } /// Cancel deployments with stale heartbeats -async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<()> { +async fn cancel_stale_heartbeat_deployments( + client: &Pool, + canceller_deployment_id: i64, +) -> Result<()> { let stale_deployments = fetch::stale_heartbeat_deployments(client, HEARTBEAT_TIMEOUT).await?; + let cancellation_note = format!( + "Cancelled by deployment {} due to stale heartbeat", + canceller_deployment_id + ); + for deployment in stale_deployments { log::warn!( "Cancelling deployment {} ({}, version={}) due to stale heartbeat: last seen {} ago at {}", @@ -50,12 +58,7 @@ async fn cancel_stale_heartbeat_deployments(client: &Pool) -> Result<( deployment.heartbeat_timestamp.to_string(), ); - cancel::deployment( - client, - deployment.id, - Some("Cancelled due to stale heartbeat"), - ) - .await?; + cancel::deployment(client, deployment.id, Some(cancellation_note.as_str())).await?; } Ok(()) @@ -67,7 +70,7 @@ pub async fn wait_for_blocking_deployments( ) -> Result<()> { loop { // Check for and cancel any deployments with stale heartbeats - cancel_stale_heartbeat_deployments(pg_pool).await?; + cancel_stale_heartbeat_deployments(pg_pool, deployment_id).await?; let blocking_deployments = fetch::blocking_deployments(pg_pool, deployment_id).await?; From dc6e06ac67a4b6bcf5b189310f4fa52e9ce139f9 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 12 Jan 2026 21:59:35 +0100 Subject: [PATCH 11/25] feat(deploy-queue): Changed MissedTickBehavior for heartbeat interval --- deploy-queue/src/handler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index a4830d0..82a48f9 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -200,7 +200,7 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> let mut consecutive_failures: u32 = 0; let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; From f48d11647c993ae816c60cf6c57d6098765006e3 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 12 Jan 2026 22:01:24 +0100 Subject: [PATCH 12/25] feat(deploy-queue): Increased number of repetition for retrying heartbeat --- deploy-queue/src/handler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 82a48f9..3dc24e2 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -196,7 +196,7 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> HEARTBEAT_INTERVAL.as_secs() ); - const HEARTBEAT_MAX_CONSECUTIVE_FAILURES: u32 = 3; + const HEARTBEAT_MAX_CONSECUTIVE_FAILURES: u32 = 5; let mut consecutive_failures: u32 = 0; let mut interval = tokio::time::interval(HEARTBEAT_INTERVAL); From dc5f093ac152d91f9a6b187c52beb09977b0ea26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ur=C5=A1a=20Gor=C5=A1e?= <61006117+Shugenya@users.noreply.github.com> Date: Mon, 12 Jan 2026 22:02:50 +0100 Subject: [PATCH 13/25] Update deploy-queue/src/handler/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: JC Grünhage --- deploy-queue/src/handler/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 3dc24e2..920562f 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -203,7 +203,11 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - interval.tick().await; + if consecutive_failures == 0 { + interval.tick().await; + } else { + tokio::time::sleep(std::time::Duration::from_secs(2 ** consecutive_failures)).await; + } let result = tokio::time::timeout( HEARTBEAT_UPDATE_TIMEOUT, From 846177d918b915e781f1b5bf67f2dde012276f03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ur=C5=A1a=20Gor=C5=A1e?= <61006117+Shugenya@users.noreply.github.com> Date: Mon, 12 Jan 2026 22:03:11 +0100 Subject: [PATCH 14/25] Update deploy-queue/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: JC Grünhage --- deploy-queue/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index 3605361..e03c231 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -116,7 +116,7 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< cli::HeartbeatTarget::Url { url } => { let deployment_id = handler::fetch::deployment_id_by_url(&db_client, &url) .await? - .ok_or_else(|| anyhow::anyhow!("No deployment found with URL: {}", url))?; + .with_context(|| format!("No deployment found with URL: {}", url))?; handler::run_heartbeat_loop(&db_client, deployment_id) .await From 25043dda792c1ac8a0bcfdd944324969810e6997 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Mon, 12 Jan 2026 22:14:42 +0100 Subject: [PATCH 15/25] feat(deploy-queue): Removed manual abort --- deploy-queue/src/handler/mod.rs | 5 ++++- deploy-queue/src/lib.rs | 4 ---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 920562f..f4d28df 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -206,7 +206,10 @@ pub async fn run_heartbeat_loop(client: &Pool, deployment_id: i64) -> if consecutive_failures == 0 { interval.tick().await; } else { - tokio::time::sleep(std::time::Duration::from_secs(2 ** consecutive_failures)).await; + tokio::time::sleep(std::time::Duration::from_secs( + 2 * consecutive_failures as u64, + )) + .await; } let result = tokio::time::timeout( diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index e03c231..c1d3cbf 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -39,10 +39,6 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< handler::start_deployment(&db_client, deployment_id) .await .with_context(|| format!("Failed to start deployment {deployment_id}"))?; - - // Stop the heartbeat loop now that the deployment has started - heartbeat_handle.abort(); - let _ = heartbeat_handle.await; } cli::Mode::Finish { deployment_id } => { handler::finish_deployment(&db_client, deployment_id) From ffd1ec2b254bdbd90d726f0507ef39943d73eec1 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 13 Jan 2026 09:53:32 +0100 Subject: [PATCH 16/25] feat(deploy-queue): Cancel deployment manually if heartbeat loop exited due to error --- deploy-queue/src/handler/mod.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index f4d28df..28317f1 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -252,6 +252,23 @@ pub fn start_heartbeat_background(client: &Pool, deployment_id: i64) - "Heartbeat loop exited for deployment {}: {}", deployment_id, err ); + + // If the heartbeat loop stops due to repeated failures, cancel the deployment + if let Err(cancel_err) = cancel::deployment( + &heartbeat_client, + deployment_id, + Some(format!( + "Deployment {} cancelled by heartbeat loop after repeated heartbeat failures", + deployment_id, + )), + ) + .await + { + warn!( + "Failed to cancel deployment {} after heartbeat loop exit: {}", + deployment_id, cancel_err + ); + } } }) } From 668ea340c647779d1ffb36b6a18af12d3e4ede64 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 13 Jan 2026 11:33:18 +0100 Subject: [PATCH 17/25] feat(deploy-queue): Fixed clippy errors --- deploy-queue/src/handler/mod.rs | 2 +- deploy-queue/src/lib.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/deploy-queue/src/handler/mod.rs b/deploy-queue/src/handler/mod.rs index 28317f1..1b27546 100644 --- a/deploy-queue/src/handler/mod.rs +++ b/deploy-queue/src/handler/mod.rs @@ -55,7 +55,7 @@ async fn cancel_stale_heartbeat_deployments( deployment.component, deployment.version.as_deref().unwrap_or("unknown"), deployment.time_since_heartbeat.format_human(), - deployment.heartbeat_timestamp.to_string(), + deployment.heartbeat_timestamp, ); cancel::deployment(client, deployment.id, Some(cancellation_note.as_str())).await?; diff --git a/deploy-queue/src/lib.rs b/deploy-queue/src/lib.rs index c1d3cbf..4ddfd05 100644 --- a/deploy-queue/src/lib.rs +++ b/deploy-queue/src/lib.rs @@ -27,8 +27,7 @@ pub async fn run_deploy_queue(mode: cli::Mode, skip_migrations: bool) -> Result< .await .context("Faild to enqueue deployment")?; - // Start heartbeat loop in the background so we can abort it after starting - let heartbeat_handle = handler::start_heartbeat_background(&db_client, deployment_id); + handler::start_heartbeat_background(&db_client, deployment_id); // Wait for all blocking deployments to finish handler::wait_for_blocking_deployments(&db_client, deployment_id) From f806a1c6e6b9fe957c1feda52ee2ed8b33323ea9 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Wed, 14 Jan 2026 14:39:59 +0100 Subject: [PATCH 18/25] feat(deploy-queue): Updated readme with heartbeat --- deploy-queue/README.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/deploy-queue/README.md b/deploy-queue/README.md index 0012838..a765f38 100644 --- a/deploy-queue/README.md +++ b/deploy-queue/README.md @@ -117,6 +117,9 @@ deploy-queue start --environment prod --provider aws --region us-west-2 --cell-i - Writes `deployment-id=` to `$GITHUB_OUTPUT` if running in GitHub Actions - Blocks until all conflicting deployments complete - Starts the deployment automatically when ready +- Runs a background heartbeat loop that: + - Sends periodic heartbeats during the wait + - Cancels deployments with stale heartbeats (if they block current deployment) ### 2. Finish a Deployment @@ -256,6 +259,14 @@ Buffer times are configured in the database: These values are set in the initial migration and can be adjusted in the `environments` table. +### Heartbeats + +The system supports heartbeats to detect stuck deployments: + +- **Background heartbeats during `start`**: While waiting for blocking deployments, a background task updates the deployment's `heartbeat_timestamp`. If it fails to send heartbeats repeatedly, it cancels the deployment with a note. It also cancels other deployments with stale heartbeats that are blocking your deployment. +- **Manual heartbeats**: `deploy-queue heartbeat deployment --deployment-id ` (or `heartbeat url --url `) runs a foreground loop that sends heartbeats until stopped. +- **Stale heartbeat detection**: deployments with a heartbeat older than the configured timeout (currently set to 15 minutes) are cancelled automatically when the heartbeat loop runs. + ## Database Schema The system uses two main tables: @@ -306,6 +317,16 @@ on: jobs: deploy: runs-on: ubuntu-latest + services: + heartbeat: + image: ghcr.io/neondatabase/deploy-queue:latest + env: + DEPLOY_QUEUE_DATABASE_URL: ${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} + # Run manual heartbeat loop using the GitHub URL to look up the deployment + options: >- + --entrypoint /bin/sh + command: >- + -c "deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true" steps: - name: Start deployment id: deploy-queue-start @@ -365,6 +386,15 @@ jobs: deploy: runs-on: ubuntu-latest steps: + - name: Start heartbeat (only when queue enabled) + if: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' }} + run: | + docker run -d --rm \ + --name deploy-queue-heartbeat \ + -e DEPLOY_QUEUE_DATABASE_URL=${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} \ + ghcr.io/neondatabase/deploy-queue:latest \ + deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + - name: Start deployment (with queue) id: deploy-queue-start if: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' }} From 5707cfdfb00785cab1e41f364c698ace61508867 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Wed, 14 Jan 2026 16:22:04 +0100 Subject: [PATCH 19/25] feat(deploy-queue): Updated readme heartbeat for break-glass --- deploy-queue/README.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/deploy-queue/README.md b/deploy-queue/README.md index a765f38..b4df4a1 100644 --- a/deploy-queue/README.md +++ b/deploy-queue/README.md @@ -385,16 +385,17 @@ For emergency situations where you need to bypass the deploy queue entirely, use jobs: deploy: runs-on: ubuntu-latest + services: + heartbeat: + # Only start the heartbeat when DEPLOY_QUEUE_ENABLED == 'true' + image: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' && 'ghcr.io/neondatabase/deploy-queue:latest' || '' }} + env: + DEPLOY_QUEUE_DATABASE_URL: ${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} + entrypoint: ["/bin/sh", "-c"] + command: + - > + deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true steps: - - name: Start heartbeat (only when queue enabled) - if: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' }} - run: | - docker run -d --rm \ - --name deploy-queue-heartbeat \ - -e DEPLOY_QUEUE_DATABASE_URL=${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} \ - ghcr.io/neondatabase/deploy-queue:latest \ - deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} - - name: Start deployment (with queue) id: deploy-queue-start if: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' }} From b8f4ac30e374a549bf0fb61b9aafecff9bee2a02 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Wed, 14 Jan 2026 16:30:10 +0100 Subject: [PATCH 20/25] feat(deploy-queue): Updated readme heartbeat example --- deploy-queue/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deploy-queue/README.md b/deploy-queue/README.md index b4df4a1..8fe1638 100644 --- a/deploy-queue/README.md +++ b/deploy-queue/README.md @@ -323,10 +323,10 @@ jobs: env: DEPLOY_QUEUE_DATABASE_URL: ${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} # Run manual heartbeat loop using the GitHub URL to look up the deployment - options: >- - --entrypoint /bin/sh - command: >- - -c "deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true" + entrypoint: ["/bin/sh", "-c"] + command: + - > + deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true steps: - name: Start deployment id: deploy-queue-start From 6e057f85b4674f77698a322a4f382c3051804c12 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Wed, 14 Jan 2026 23:10:23 +0100 Subject: [PATCH 21/25] feat(deploy-queue): Integration tests for heartbeat --- deploy-queue/tests/heartbeat_tests.rs | 126 ++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 deploy-queue/tests/heartbeat_tests.rs diff --git a/deploy-queue/tests/heartbeat_tests.rs b/deploy-queue/tests/heartbeat_tests.rs new file mode 100644 index 0000000..839be04 --- /dev/null +++ b/deploy-queue/tests/heartbeat_tests.rs @@ -0,0 +1,126 @@ +use anyhow::Result; +use deploy_queue::{constants::HEARTBEAT_TIMEOUT, handler}; +use time::{Duration as TimeDuration, OffsetDateTime}; + +#[path = "common/test_db_setup.rs"] +mod database_helpers; + +#[path = "fixtures/deployment.rs"] +mod deployment_fixtures; + +extern crate deploy_queue; + +#[tokio::test] +async fn heartbeat_loop_sets_timestamp() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + let deployment_id = deployment_fixtures::create_test_deployment(&pool).await?; + + // Run the heartbeat and wait a few milliseconds (so it can write the timestamp) + let heartbeat_pool = pool.clone(); + let handle = tokio::spawn(async move { + handler::run_heartbeat_loop(&heartbeat_pool, deployment_id) + .await + .ok(); + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Check that the heartbeat timestamp was set + let (heartbeat_timestamp,): (Option,) = + sqlx::query_as("SELECT heartbeat_timestamp FROM deployments WHERE id = $1") + .bind(deployment_id) + .fetch_one(&pool) + .await?; + + assert!( + heartbeat_timestamp.is_some(), + "Heartbeat loop should set heartbeat_timestamp" + ); + + // Stop the heartbeat loop + handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn stale_heartbeat_detection_flags_old_running_deployments() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + let deployment_id = deployment_fixtures::create_running_deployment(&pool).await?; + + // Set heartbeat older than the timeout + let stale_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 + 60); + sqlx::query("UPDATE deployments SET heartbeat_timestamp = $1 WHERE id = $2") + .bind(stale_at) + .bind(deployment_id) + .execute(&pool) + .await?; + + // Should be returned as stale + let stale = handler::fetch::stale_heartbeat_deployments(&pool, HEARTBEAT_TIMEOUT).await?; + assert!( + stale.iter().any(|d| d.id == deployment_id), + "Deployment with stale heartbeat should be flagged" + ); + + // Make the heartbeat fresh and ensure it is no longer reported + let fresh_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 - 60); + sqlx::query("UPDATE deployments SET heartbeat_timestamp = $1 WHERE id = $2") + .bind(fresh_at) + .bind(deployment_id) + .execute(&pool) + .await?; + + let stale_again = handler::fetch::stale_heartbeat_deployments(&pool, HEARTBEAT_TIMEOUT).await?; + assert!( + !stale_again.iter().any(|d| d.id == deployment_id), + "Deployment with fresh heartbeat should not be flagged" + ); + + Ok(()) +} + +#[tokio::test] +async fn stale_blocker_gets_cancelled_when_waiting_for_blockers() -> Result<()> { + let pool = database_helpers::setup_test_db().await?; + + // Create a running deployment with a stale heartbeat that will block others + let blocking = deployment_fixtures::create_running_deployment(&pool).await?; + let stale_at = + OffsetDateTime::now_utc() - TimeDuration::seconds(HEARTBEAT_TIMEOUT.as_secs() as i64 + 60); + sqlx::query( + "UPDATE deployments + SET heartbeat_timestamp = $1 + WHERE id = $2", + ) + .bind(stale_at) + .bind(blocking) + .execute(&pool) + .await?; + + // Create a new deployment and check for blocking deployments + let waiter = deployment_fixtures::create_test_deployment(&pool).await?; + handler::wait_for_blocking_deployments(&pool, waiter).await?; + + // Verify the blocking deployment was cancelled with the expected note + let (cancellation_timestamp, cancellation_note): (Option, Option) = + sqlx::query_as( + "SELECT cancellation_timestamp, cancellation_note FROM deployments WHERE id = $1", + ) + .bind(blocking) + .fetch_one(&pool) + .await?; + + assert!( + cancellation_timestamp.is_some(), + "Blocking deployment should be cancelled" + ); + let note = cancellation_note.expect("cancellation_note should be set"); + assert!( + note.contains(&format!("Cancelled by deployment {}", waiter)), + "Cancellation note should mention the cancelling deployment id; got {note}" + ); + + Ok(()) +} From 9fff2b327a601f4f41e9ab6359da872c35f251e2 Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 20 Jan 2026 09:38:03 +0100 Subject: [PATCH 22/25] feat(deploy-queue): Fixed compatibility test --- .github/workflows/deploy-queue.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/deploy-queue.yml b/.github/workflows/deploy-queue.yml index 9d055e9..f85814d 100644 --- a/.github/workflows/deploy-queue.yml +++ b/.github/workflows/deploy-queue.yml @@ -760,22 +760,22 @@ jobs: mode: finish deployment-id: ${{ steps.start-v0-5-2.outputs.deployment-id }} - # Test v0.6.0 with the updated schema - - name: Start deployment (v0.6.0) + # Test v0.6.1 with the updated schema + - name: Start deployment (v0.6.1) id: start-v0-6-0 - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.0 + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.1 with: mode: start environment: dev cloud-provider: aws - region: compat-v0-6-0 + region: compat-v0-6-1 cell-index: 1 component: compat-test - version: v0.6.0 - note: Compatibility test - v0.6.0 + version: v0.6.1 + note: Compatibility test - v0.6.1 - - name: Finish deployment (v0.6.0) - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.0 + - name: Finish deployment (v0.6.1) + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.1 with: mode: finish deployment-id: ${{ steps.start-v0-6-0.outputs.deployment-id }} From ea2466985577159fb912f84fc9cf5f687520c17c Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 20 Jan 2026 09:38:32 +0100 Subject: [PATCH 23/25] chore(deploy-queue): version bump --- deploy-queue/Cargo.lock | 2 +- deploy-queue/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy-queue/Cargo.lock b/deploy-queue/Cargo.lock index 123287c..a7c4e19 100644 --- a/deploy-queue/Cargo.lock +++ b/deploy-queue/Cargo.lock @@ -342,7 +342,7 @@ dependencies = [ [[package]] name = "deploy-queue" -version = "0.7.2" +version = "0.8.0" dependencies = [ "anyhow", "backon", diff --git a/deploy-queue/Cargo.toml b/deploy-queue/Cargo.toml index 1d36c2d..1112ec9 100644 --- a/deploy-queue/Cargo.toml +++ b/deploy-queue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deploy-queue" -version = "0.7.2" +version = "0.8.0" edition = "2024" [dependencies] From e6ca95923d154ff8b7dc0bb0379eb21337e4abcd Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 20 Jan 2026 09:50:35 +0100 Subject: [PATCH 24/25] chore(deploy-queue): fix compatibility test --- .github/workflows/deploy-queue.yml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/deploy-queue.yml b/.github/workflows/deploy-queue.yml index f85814d..f704a10 100644 --- a/.github/workflows/deploy-queue.yml +++ b/.github/workflows/deploy-queue.yml @@ -760,22 +760,22 @@ jobs: mode: finish deployment-id: ${{ steps.start-v0-5-2.outputs.deployment-id }} - # Test v0.6.1 with the updated schema - - name: Start deployment (v0.6.1) - id: start-v0-6-0 - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.1 + # Test v0.7.0 with the updated schema + - name: Start deployment (v0.7.0) + id: start-v0-7-0 + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.7.0 with: mode: start environment: dev cloud-provider: aws - region: compat-v0-6-1 + region: compat-v0-7-0 cell-index: 1 component: compat-test - version: v0.6.1 - note: Compatibility test - v0.6.1 + version: v0.7.0 + note: Compatibility test - v0.7.0 - - name: Finish deployment (v0.6.1) - uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.6.1 + - name: Finish deployment (v0.7.0) + uses: neondatabase/dev-actions/deploy-queue@deploy-queue-v0.7.0 with: mode: finish - deployment-id: ${{ steps.start-v0-6-0.outputs.deployment-id }} + deployment-id: ${{ steps.start-v0-7-0.outputs.deployment-id }} From b84172139799ae549cdab07479e1b9a0d024738f Mon Sep 17 00:00:00 2001 From: ursa gorse Date: Tue, 20 Jan 2026 15:36:25 +0100 Subject: [PATCH 25/25] chore(deploy-queue): updated readme - removed unclear part for heartbeat --- deploy-queue/README.md | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/deploy-queue/README.md b/deploy-queue/README.md index 8fe1638..9c766dc 100644 --- a/deploy-queue/README.md +++ b/deploy-queue/README.md @@ -317,16 +317,6 @@ on: jobs: deploy: runs-on: ubuntu-latest - services: - heartbeat: - image: ghcr.io/neondatabase/deploy-queue:latest - env: - DEPLOY_QUEUE_DATABASE_URL: ${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} - # Run manual heartbeat loop using the GitHub URL to look up the deployment - entrypoint: ["/bin/sh", "-c"] - command: - - > - deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true steps: - name: Start deployment id: deploy-queue-start @@ -385,16 +375,6 @@ For emergency situations where you need to bypass the deploy queue entirely, use jobs: deploy: runs-on: ubuntu-latest - services: - heartbeat: - # Only start the heartbeat when DEPLOY_QUEUE_ENABLED == 'true' - image: ${{ vars.DEPLOY_QUEUE_ENABLED == 'true' && 'ghcr.io/neondatabase/deploy-queue:latest' || '' }} - env: - DEPLOY_QUEUE_DATABASE_URL: ${{ secrets.DEPLOY_QUEUE_DATABASE_URL }} - entrypoint: ["/bin/sh", "-c"] - command: - - > - deploy-queue heartbeat url --url ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} || true steps: - name: Start deployment (with queue) id: deploy-queue-start