Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/durabletask-js/src/task/when-all-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ export class WhenAllTask<T> extends CompositeTask<T[]> {

onChildCompleted(task: Task<any>): void {
if (this._isComplete) {
throw new Error("Task is already completed");
// Already completed (fail-fast or all children done). Ignore subsequent child completions.
return;
}

this._completedTasks++;

if (task.isFailed && !this._exception) {
this._exception = task.getException();
this._isComplete = true;
return;
}

if (this._completedTasks == this._tasks.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
}

async resume() {
// Don't resume if the orchestration is already complete
if (this._isComplete) {
return;
}

// This is never expected unless maybe there's an issue with the history
if (!this._generator) {
throw new Error("The orchestrator generator is not initialized! Was the orchestration history corrupted?");
Expand Down
132 changes: 132 additions & 0 deletions packages/durabletask-js/test/orchestration_executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,138 @@ describe("Orchestration Executor", () => {
it("should throw when whenAny is called with an empty task array", () => {
expect(() => whenAny([])).toThrow("whenAny requires at least one task");
});

it("should fail whenAll correctly when the failing task is the last to complete", async () => {
const printInt = (_: any, value: number) => {
return value.toString();
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const tasks: Task<string>[] = [];

for (let i = 0; i < 3; i++) {
tasks.push(ctx.callActivity(printInt, i));
}

const results = yield whenAll(tasks);
return results;
};

const registry = new Registry();
const orchestratorName = registry.addOrchestrator(orchestrator);
const activityName = registry.addActivity(printInt);

const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];

for (let i = 0; i < 3; i++) {
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
}

// First two tasks succeed, last task fails
const ex = new Error("Last task failed");
const newEvents: any[] = [
newTaskCompletedEvent(1, printInt(null, 0)),
newTaskCompletedEvent(2, printInt(null, 1)),
newTaskFailedEvent(3, ex),
];

const executor = new OrchestrationExecutor(registry, testLogger);
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);

const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError");
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message);
});

it("should not crash when additional tasks complete after whenAll fails fast", async () => {
const printInt = (_: any, value: number) => {
return value.toString();
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const tasks: Task<string>[] = [];

for (let i = 0; i < 3; i++) {
tasks.push(ctx.callActivity(printInt, i));
}

const results = yield whenAll(tasks);
return results;
};

const registry = new Registry();
const orchestratorName = registry.addOrchestrator(orchestrator);
const activityName = registry.addActivity(printInt);

const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];

for (let i = 0; i < 3; i++) {
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
}

// First task fails, then remaining tasks complete in the same batch
const ex = new Error("First task failed");
const newEvents: any[] = [
newTaskFailedEvent(1, ex),
newTaskCompletedEvent(2, printInt(null, 1)),
newTaskCompletedEvent(3, printInt(null, 2)),
];

const executor = new OrchestrationExecutor(registry, testLogger);
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);

const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError");
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message);
});

it("should preserve orchestration result when whenAll failure is caught and other tasks complete", async () => {
const printInt = (_: any, value: number) => {
return value.toString();
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const tasks: Task<string>[] = [];

for (let i = 0; i < 3; i++) {
tasks.push(ctx.callActivity(printInt, i));
}

try {
yield whenAll(tasks);
} catch {
// Intentionally catch the failure and return a fallback result
return "handled";
}
};

const registry = new Registry();
const orchestratorName = registry.addOrchestrator(orchestrator);
const activityName = registry.addActivity(printInt);

const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];

for (let i = 0; i < 3; i++) {
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
}

// First task fails, then remaining tasks complete in the same batch
const ex = new Error("One task failed");
const newEvents: any[] = [
newTaskFailedEvent(1, ex),
newTaskCompletedEvent(2, printInt(null, 1)),
newTaskCompletedEvent(3, printInt(null, 2)),
];

const executor = new OrchestrationExecutor(registry, testLogger);
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);

const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify("handled"));
});
});

function getAndValidateSingleCompleteOrchestrationAction(
Expand Down
112 changes: 112 additions & 0 deletions test/e2e-azuremanaged/orchestration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,118 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => {
expect(activityCounter).toEqual(10);
}, 31000);

it("should remain completed when whenAll fail-fast is caught and other children complete later", async () => {
let failActivityCounter = 0;
let slowActivityCounter = 0;

const fastFail = async (_: ActivityContext): Promise<void> => {
failActivityCounter++;
throw new Error("fast failure for whenAll fail-fast test");
};

const slowSuccess = async (_: ActivityContext, _input: string): Promise<void> => {
slowActivityCounter++;
await new Promise((resolve) => setTimeout(resolve, 1200));
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
try {
yield whenAll([
ctx.callActivity(fastFail),
ctx.callActivity(slowSuccess, "a"),
ctx.callActivity(slowSuccess, "b"),
]);
} catch {
return "handled-failure";
}
};

taskHubWorker.addActivity(fastFail);
taskHubWorker.addActivity(slowSuccess);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state).toBeDefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.failureDetails).toBeUndefined();
expect(state?.serializedOutput).toEqual(JSON.stringify("handled-failure"));
expect(failActivityCounter).toEqual(1);
expect(slowActivityCounter).toEqual(2);

await new Promise((resolve) => setTimeout(resolve, 2000));
const finalState = await taskHubClient.getOrchestrationState(id);
expect(finalState).toBeDefined();
expect(finalState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(finalState?.serializedOutput).toEqual(JSON.stringify("handled-failure"));
}, 31000);

it("should preserve original whenAll failure details when not caught", async () => {
const fastFail = async (_: ActivityContext): Promise<void> => {
throw new Error("fast failure for whenAll uncaught test");
};

const slowSuccess = async (_: ActivityContext): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 1200));
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield whenAll([
ctx.callActivity(fastFail),
ctx.callActivity(slowSuccess),
ctx.callActivity(slowSuccess),
]);
};

taskHubWorker.addActivity(fastFail);
taskHubWorker.addActivity(slowSuccess);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state).toBeDefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(state?.failureDetails).toBeDefined();
expect(state?.failureDetails?.message).toContain("fast failure for whenAll uncaught test");
expect(state?.failureDetails?.message).not.toContain("Task is already completed");
}, 31000);

it("should fail whenAll correctly when the failing task is the last to complete", async () => {
const fastSuccess = async (_: ActivityContext): Promise<void> => {
// completes immediately
};

const slowFail = async (_: ActivityContext): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 1200));
throw new Error("slow failure as last task");
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield whenAll([
ctx.callActivity(fastSuccess),
ctx.callActivity(fastSuccess),
ctx.callActivity(slowFail),
]);
};

taskHubWorker.addActivity(fastSuccess);
taskHubWorker.addActivity(slowFail);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state).toBeDefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(state?.failureDetails).toBeDefined();
expect(state?.failureDetails?.message).toContain("slow failure as last task");
}, 31000);

it("should be able to use the sub-orchestration", async () => {
let activityCounter = 0;

Expand Down
Loading