From e314c95a5bd35bf3158daeefa444bce7f237d56c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 8 Aug 2025 16:51:59 +0100 Subject: [PATCH 1/3] Only start an attempt if not finished. Send message to worker if pending executing --- .../src/engine/systems/runAttemptSystem.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index d2ef1b7592a..e8c78b21746 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -41,7 +41,12 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js"; import { sendNotificationToWorker } from "../eventBus.js"; import { getMachinePreset, machinePresetFromName } from "../machinePresets.js"; import { retryOutcomeFromCompletion } from "../retrying.js"; -import { isExecuting, isInitialState } from "../statuses.js"; +import { + isExecuting, + isFinishedOrPendingFinished, + isInitialState, + isPendingExecuting, +} from "../statuses.js"; import { RunEngineOptions } from "../types.js"; import { BatchSystem } from "./batchSystem.js"; import { DelayedRunSystem } from "./delayedRunSystem.js"; @@ -345,8 +350,8 @@ export class RunAttemptSystem { span.setAttribute("taskRunId", taskRun.id); span.setAttribute("taskRunFriendlyId", taskRun.friendlyId); - if (taskRun.status === "CANCELED") { - throw new ServiceValidationError("Task run is cancelled", 400); + if (isFinishedOrPendingFinished(latestSnapshot.executionStatus)) { + throw new ServiceValidationError("Task run is already finished", 400); } if (!taskRun.lockedById) { @@ -1321,7 +1326,10 @@ export class RunAttemptSystem { }); //if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status - if (isExecuting(latestSnapshot.executionStatus)) { + if ( + isExecuting(latestSnapshot.executionStatus) || + isPendingExecuting(latestSnapshot.executionStatus) + ) { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run, snapshot: { From 86ba734f596d1ad939bf2fd9e9152b3939a79bde Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 11:26:54 +0100 Subject: [PATCH 2/3] Fix the exit process reason tet --- packages/cli-v3/src/entryPoints/managed/execution.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index be118b6979a..dbf94488077 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -300,7 +300,7 @@ export class RunExecution { return; } - await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "re-queued" }); + await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "already-finished" }); return; } case "QUEUED_EXECUTING": From 9664de197cd5624860377c31899871bf5c888f86 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 11 Aug 2025 11:37:45 +0100 Subject: [PATCH 3/3] Fixed cancelling test since bug fix The old behaviour was wrong for pending executing in the test --- .../run-engine/src/engine/tests/cancelling.test.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts index 5fb4fc345e4..1e5947cfbc1 100644 --- a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts +++ b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts @@ -150,7 +150,7 @@ describe("RunEngine cancelling", () => { cancelledEventData.push(result); }); - //todo call completeAttempt (this will happen from the worker) + // call completeAttempt manually (this will happen from the worker) const completeResult = await engine.completeRunAttempt({ runId: parentRun.id, snapshotId: executionData!.snapshot.id, @@ -289,13 +289,6 @@ describe("RunEngine cancelling", () => { prisma ); - //dequeue the run - await setTimeout(500); - const dequeued = await engine.dequeueFromWorkerQueue({ - consumerId: "test_12345", - workerQueue: "main", - }); - let cancelledEventData: EventBusEventArgs<"runCancelled">[0][] = []; engine.eventBus.on("runCancelled", (result) => { cancelledEventData.push(result);