From 2389601a0dcef05b1c255221b6ceb5e5054f8051 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 23:41:38 +0000 Subject: [PATCH 1/3] Fix WhenAllTask crash when children complete after fail-fast WhenAllTask.onChildCompleted() had two bugs: 1. Threw 'Task is already completed' when a child completed after the WhenAllTask had already failed via fail-fast. This crashed orchestrations when multiple activities were in a WhenAll and one failed while others completed in the same or subsequent event batch. 2. Fell through from the fail-fast block to the result-collection block when the failing task was the last child to complete, causing getResult() to throw on the failed task. Fixes: - Change throw to return in onChildCompleted when already complete - Add return after fail-fast to prevent fall-through to getResult() - Add _isComplete guard in RuntimeOrchestrationContext.resume() to prevent attempting to resume a finished generator Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../durabletask-js/src/task/when-all-task.ts | 4 +- .../worker/runtime-orchestration-context.ts | 5 + .../test/orchestration_executor.spec.ts | 132 ++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/packages/durabletask-js/src/task/when-all-task.ts b/packages/durabletask-js/src/task/when-all-task.ts index 8677c20..e0646d5 100644 --- a/packages/durabletask-js/src/task/when-all-task.ts +++ b/packages/durabletask-js/src/task/when-all-task.ts @@ -27,7 +27,8 @@ export class WhenAllTask extends CompositeTask { onChildCompleted(task: Task): void { if (this._isComplete) { - throw new Error("Task is already completed"); + // Already completed (fail-fast or all children done). Ignore subsequent child completions. + return; } this._completedTasks++; @@ -35,6 +36,7 @@ export class WhenAllTask extends CompositeTask { if (task.isFailed && !this._exception) { this._exception = task.getException(); this._isComplete = true; + return; } if (this._completedTasks == this._tasks.length) { diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index c01e19d..a23b462 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -130,6 +130,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } async resume() { + // Don't resume if the orchestration is already complete + if (this._isComplete) { + return; + } + // This is never expected unless maybe there's an issue with the history if (!this._generator) { throw new Error("The orchestrator generator is not initialized! Was the orchestration history corrupted?"); diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 3079af4..7b2795e 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -1179,6 +1179,138 @@ describe("Orchestration Executor", () => { it("should throw when whenAny is called with an empty task array", () => { expect(() => whenAny([])).toThrow("whenAny requires at least one task"); }); + + it("should fail whenAll correctly when the failing task is the last to complete", async () => { + const printInt = (_: any, value: number) => { + return value.toString(); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const tasks: Task[] = []; + + for (let i = 0; i < 3; i++) { + tasks.push(ctx.callActivity(printInt, i)); + } + + const results = yield whenAll(tasks); + return results; + }; + + const registry = new Registry(); + const orchestratorName = registry.addOrchestrator(orchestrator); + const activityName = registry.addActivity(printInt); + + const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)]; + + for (let i = 0; i < 3; i++) { + oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString())); + } + + // First two tasks succeed, last task fails + const ex = new Error("Last task failed"); + const newEvents: any[] = [ + newTaskCompletedEvent(1, printInt(null, 0)), + newTaskCompletedEvent(2, printInt(null, 1)), + newTaskFailedEvent(3, ex), + ]; + + const executor = new OrchestrationExecutor(registry, testLogger); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); + expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); + }); + + it("should not crash when additional tasks complete after whenAll fails fast", async () => { + const printInt = (_: any, value: number) => { + return value.toString(); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const tasks: Task[] = []; + + for (let i = 0; i < 3; i++) { + tasks.push(ctx.callActivity(printInt, i)); + } + + const results = yield whenAll(tasks); + return results; + }; + + const registry = new Registry(); + const orchestratorName = registry.addOrchestrator(orchestrator); + const activityName = registry.addActivity(printInt); + + const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)]; + + for (let i = 0; i < 3; i++) { + oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString())); + } + + // First task fails, then remaining tasks complete in the same batch + const ex = new Error("First task failed"); + const newEvents: any[] = [ + newTaskFailedEvent(1, ex), + newTaskCompletedEvent(2, printInt(null, 1)), + newTaskCompletedEvent(3, printInt(null, 2)), + ]; + + const executor = new OrchestrationExecutor(registry, testLogger); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); + expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); + }); + + it("should preserve orchestration result when whenAll failure is caught and other tasks complete", async () => { + const printInt = (_: any, value: number) => { + return value.toString(); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const tasks: Task[] = []; + + for (let i = 0; i < 3; i++) { + tasks.push(ctx.callActivity(printInt, i)); + } + + try { + yield whenAll(tasks); + } catch { + // Intentionally catch the failure and return a fallback result + return "handled"; + } + }; + + const registry = new Registry(); + const orchestratorName = registry.addOrchestrator(orchestrator); + const activityName = registry.addActivity(printInt); + + const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)]; + + for (let i = 0; i < 3; i++) { + oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString())); + } + + // First task fails, then remaining tasks complete in the same batch + const ex = new Error("One task failed"); + const newEvents: any[] = [ + newTaskFailedEvent(1, ex), + newTaskCompletedEvent(2, printInt(null, 1)), + newTaskCompletedEvent(3, printInt(null, 2)), + ]; + + const executor = new OrchestrationExecutor(registry, testLogger); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify("handled")); + }); }); function getAndValidateSingleCompleteOrchestrationAction( From 0e3daee9a87883da170b97e7b346575767506558 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 4 Mar 2026 10:26:04 -0800 Subject: [PATCH 2/3] add e2e tests --- test/e2e-azuremanaged/orchestration.spec.ts | 80 +++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index 40d7cc3..ca62bb2 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -190,6 +190,86 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(activityCounter).toEqual(10); }, 31000); + it("should remain completed when whenAll fail-fast is caught and other children complete later", async () => { + let failActivityCounter = 0; + let slowActivityCounter = 0; + + const fastFail = async (_: ActivityContext): Promise => { + failActivityCounter++; + throw new Error("fast failure for whenAll fail-fast test"); + }; + + const slowSuccess = async (_: ActivityContext, _input: string): Promise => { + slowActivityCounter++; + await new Promise((resolve) => setTimeout(resolve, 1200)); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + try { + yield whenAll([ + ctx.callActivity(fastFail), + ctx.callActivity(slowSuccess, "a"), + ctx.callActivity(slowSuccess, "b"), + ]); + } catch { + return "handled-failure"; + } + }; + + taskHubWorker.addActivity(fastFail); + taskHubWorker.addActivity(slowSuccess); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("handled-failure")); + expect(failActivityCounter).toEqual(1); + expect(slowActivityCounter).toEqual(2); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + const finalState = await taskHubClient.getOrchestrationState(id); + expect(finalState).toBeDefined(); + expect(finalState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(finalState?.serializedOutput).toEqual(JSON.stringify("handled-failure")); + }, 31000); + + it("should preserve original whenAll failure details when not caught", async () => { + const fastFail = async (_: ActivityContext): Promise => { + throw new Error("fast failure for whenAll uncaught test"); + }; + + const slowSuccess = async (_: ActivityContext): Promise => { + await new Promise((resolve) => setTimeout(resolve, 1200)); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield whenAll([ + ctx.callActivity(fastFail), + ctx.callActivity(slowSuccess), + ctx.callActivity(slowSuccess), + ]); + }; + + taskHubWorker.addActivity(fastFail); + taskHubWorker.addActivity(slowSuccess); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + expect(state?.failureDetails?.message).toContain("fast failure for whenAll uncaught test"); + expect(state?.failureDetails?.message).not.toContain("Task is already completed"); + }, 31000); + it("should be able to use the sub-orchestration", async () => { let activityCounter = 0; From ca31291dc3f66f67744b516671a01c314e8841e4 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Wed, 4 Mar 2026 11:55:43 -0800 Subject: [PATCH 3/3] Add test for whenAll behavior with failing task as last to complete --- test/e2e-azuremanaged/orchestration.spec.ts | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index ca62bb2..47792a2 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -270,6 +270,38 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(state?.failureDetails?.message).not.toContain("Task is already completed"); }, 31000); + it("should fail whenAll correctly when the failing task is the last to complete", async () => { + const fastSuccess = async (_: ActivityContext): Promise => { + // completes immediately + }; + + const slowFail = async (_: ActivityContext): Promise => { + await new Promise((resolve) => setTimeout(resolve, 1200)); + throw new Error("slow failure as last task"); + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield whenAll([ + ctx.callActivity(fastSuccess), + ctx.callActivity(fastSuccess), + ctx.callActivity(slowFail), + ]); + }; + + taskHubWorker.addActivity(fastSuccess); + taskHubWorker.addActivity(slowFail); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + expect(state?.failureDetails?.message).toContain("slow failure as last task"); + }, 31000); + it("should be able to use the sub-orchestration", async () => { let activityCounter = 0;