diff --git a/packages/durabletask-js/src/task/context/orchestration-context.ts b/packages/durabletask-js/src/task/context/orchestration-context.ts index c27363f..bca1c4f 100644 --- a/packages/durabletask-js/src/task/context/orchestration-context.ts +++ b/packages/durabletask-js/src/task/context/orchestration-context.ts @@ -91,4 +91,41 @@ export abstract class OrchestrationContext { * @param saveEvents {boolean} A flag indicating whether to add any unprocessed external events in the new orchestration history. */ abstract continueAsNew(newInput: any, saveEvents: boolean): void; + + /** + * Sets a custom status value for the current orchestration instance. + * + * The custom status value is serialized and stored in orchestration state and will + * be made available to the orchestration status query APIs. The serialized value + * must not exceed 16 KB of UTF-16 encoded text. + * + * @param {any} customStatus A JSON-serializable value to assign as the custom status, or `null`/`undefined` to clear it. + */ + abstract setCustomStatus(customStatus: any): void; + + /** + * Sends an event to another orchestration instance. + * + * The target orchestration can handle the sent event using the `waitForExternalEvent()` method. + * If the target orchestration doesn't exist or has completed, the event will be silently dropped. + * + * @param {string} instanceId The ID of the orchestration instance to send the event to. + * @param {string} eventName The name of the event. Event names are case-insensitive. + * @param {any} eventData The JSON-serializable payload of the event. + */ + abstract sendEvent(instanceId: string, eventName: string, eventData?: any): void; + + /** + * Creates a new UUID that is safe for replay within an orchestration. + * + * This method generates a deterministic UUID v5 using the algorithm from RFC 4122 §4.3. + * The name input used to generate this value is a combination of the orchestration instance ID, + * the current UTC datetime, and an internally managed sequence counter. + * + * Use this method instead of random UUID generators (like `crypto.randomUUID()`) to ensure + * deterministic execution during orchestration replay. + * + * @returns {string} A new deterministic UUID string. + */ + abstract newGuid(): string; } diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index ca35b9b..38928c0 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -313,6 +313,27 @@ export function newCreateSubOrchestrationAction( return action; } +export function newSendEventAction( + id: number, + instanceId: string, + eventName: string, + encodedData?: string, +): pb.OrchestratorAction { + const orchestrationInstance = new pb.OrchestrationInstance(); + orchestrationInstance.setInstanceid(instanceId); + + const sendEventAction = new pb.SendEventAction(); + sendEventAction.setInstance(orchestrationInstance); + sendEventAction.setName(eventName); + sendEventAction.setData(getStringValue(encodedData)); + + const action = new pb.OrchestratorAction(); + action.setId(id); + action.setSendevent(sendEventAction); + + return action; +} + export function isEmpty(v?: StringValue | null): boolean { return v == null || v.getValue() === ""; } diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 502149f..c4b99b1 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -21,6 +21,14 @@ import { StopIterationError } from "./exception/stop-iteration-error"; import { Registry } from "./registry"; import { RuntimeOrchestrationContext } from "./runtime-orchestration-context"; +/** + * Result of orchestration execution containing actions and optional custom status. + */ +export interface OrchestrationExecutionResult { + actions: pb.OrchestratorAction[]; + customStatus?: string; +} + export class OrchestrationExecutor { _generator?: TOrchestrator; _registry: Registry; @@ -38,7 +46,7 @@ export class OrchestrationExecutor { instanceId: string, oldEvents: pb.HistoryEvent[], newEvents: pb.HistoryEvent[], - ): Promise { + ): Promise { if (!newEvents?.length) { throw new OrchestrationStateError("The new history event list must have at least one event in it"); } @@ -81,7 +89,10 @@ export class OrchestrationExecutor { const actions = ctx.getActions(); console.log(`${instanceId}: Returning ${actions.length} action(s)`); - return actions; + return { + actions, + customStatus: ctx.getCustomStatus(), + }; } private async processEvent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { @@ -101,7 +112,7 @@ export class OrchestrationExecutor { try { switch (eventType) { case pb.HistoryEvent.EventtypeCase.ORCHESTRATORSTARTED: - ctx._currentUtcDatetime = event.getTimestamp()?.toDate(); + ctx._currentUtcDatetime = event.getTimestamp()?.toDate() ?? ctx._currentUtcDatetime; break; case pb.HistoryEvent.EventtypeCase.EXECUTIONSTARTED: { diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index 4b392f7..7af2435 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +import { createHash } from "crypto"; import { getName } from "../task"; import { OrchestrationContext } from "../task/context/orchestration-context"; import * as pb from "../proto/orchestrator_service_pb"; @@ -22,14 +23,16 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { _result: any; _pendingActions: Record; _pendingTasks: Record>; - _sequenceNumber: any; - _currentUtcDatetime: any; + _sequenceNumber: number; + _newGuidCounter: number; + _currentUtcDatetime: Date; _instanceId: string; _completionStatus?: pb.OrchestrationStatus; _receivedEvents: Record; _pendingEvents: Record[]>; _newInput?: any; - _saveEvents: any; + _saveEvents: boolean; + _customStatus?: any; constructor(instanceId: string) { super(); @@ -41,6 +44,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._pendingActions = {}; this._pendingTasks = {}; this._sequenceNumber = 0; + this._newGuidCounter = 0; this._currentUtcDatetime = new Date(1000, 0, 1); this._instanceId = instanceId; this._completionStatus = undefined; @@ -48,6 +52,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._pendingEvents = {}; this._newInput = undefined; this._saveEvents = false; + this._customStatus = undefined; } get instanceId(): string { @@ -141,7 +146,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._isComplete = true; this._completionStatus = status; - this._pendingActions = {}; // Clear any pending actions + // Note: Do NOT clear pending actions here - fire-and-forget actions like sendEvent + // must be preserved and returned alongside the complete action this._result = result; @@ -163,7 +169,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._isComplete = true; this._completionStatus = pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED; - this._pendingActions = {}; // Cancel any pending actions + // Note: Do NOT clear pending actions here - fire-and-forget actions like sendEvent + // must be preserved and returned alongside the complete action const action = ph.newCompleteOrchestrationAction( this.nextSequenceNumber(), @@ -362,6 +369,115 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this.setContinuedAsNew(newInput, saveEvents); } + /** + * Sets a custom status value for the current orchestration instance. + */ + setCustomStatus(customStatus: any): void { + this._customStatus = customStatus; + } + + /** + * Gets the encoded custom status value for the current orchestration instance. + * This is used internally when building the orchestrator response. + */ + getCustomStatus(): string | undefined { + if (this._customStatus === undefined || this._customStatus === null) { + return undefined; + } + return JSON.stringify(this._customStatus); + } + + /** + * Sends an event to another orchestration instance. + */ + sendEvent(instanceId: string, eventName: string, eventData?: any): void { + const id = this.nextSequenceNumber(); + const encodedData = eventData !== undefined ? JSON.stringify(eventData) : undefined; + const action = ph.newSendEventAction(id, instanceId, eventName, encodedData); + this._pendingActions[action.getId()] = action; + } + + /** + * Creates a new deterministic UUID that is safe for replay within an orchestration. + * + * Uses UUID v5 (name-based with SHA-1) per RFC 4122 §4.3. + * The generated GUID is deterministic based on instanceId, currentUtcDateTime, and a counter, + * ensuring the same value is produced during replay. + */ + newGuid(): string { + const NAMESPACE_UUID = "9e952958-5e33-4daf-827f-2fa12937b875"; + + // Build the name string: instanceId_datetime_counter + const guidNameValue = `${this._instanceId}_${this._currentUtcDatetime.toISOString()}_${this._newGuidCounter}`; + this._newGuidCounter++; + + return this.generateDeterministicGuid(NAMESPACE_UUID, guidNameValue); + } + + /** + * Generates a deterministic GUID using UUID v5 algorithm. + * The output format is compatible with other Durable Task SDKs. + */ + private generateDeterministicGuid(namespace: string, name: string): string { + // Parse namespace UUID string to bytes (big-endian/network order) + const namespaceBytes = this.parseUuidToBytes(namespace); + + // Convert name to UTF-8 bytes + const nameBytes = Buffer.from(name, "utf-8"); + + // Compute SHA-1 hash of namespace + name + const hash = createHash("sha1"); + hash.update(namespaceBytes); + hash.update(nameBytes); + const hashBytes = hash.digest(); + + // Take first 16 bytes of hash + const guidBytes = Buffer.alloc(16); + hashBytes.copy(guidBytes, 0, 0, 16); + + // Set version to 5 (UUID v5) + guidBytes[6] = (guidBytes[6] & 0x0f) | 0x50; + + // Set variant to RFC 4122 + guidBytes[8] = (guidBytes[8] & 0x3f) | 0x80; + + // Convert to GUID byte order for formatting + this.swapGuidBytes(guidBytes); + + return this.formatGuidBytes(guidBytes); + } + + /** + * Swaps bytes to convert between UUID (big-endian) and GUID (mixed-endian) byte order. + * GUIDs store the first 3 components (Data1, Data2, Data3) in little-endian format. + */ + private swapGuidBytes(bytes: Buffer): void { + [bytes[0], bytes[3]] = [bytes[3], bytes[0]]; + [bytes[1], bytes[2]] = [bytes[2], bytes[1]]; + [bytes[4], bytes[5]] = [bytes[5], bytes[4]]; + [bytes[6], bytes[7]] = [bytes[7], bytes[6]]; + } + + /** + * Parses a UUID string to a byte buffer in big-endian (network) order. + */ + private parseUuidToBytes(uuid: string): Buffer { + const hex = uuid.replace(/-/g, ""); + return Buffer.from(hex, "hex"); + } + + /** + * Formats a GUID byte buffer as a string in standard GUID format (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx). + */ + private formatGuidBytes(bytes: Buffer): string { + const data1 = bytes.slice(0, 4).reverse().toString("hex"); + const data2 = bytes.slice(4, 6).reverse().toString("hex"); + const data3 = bytes.slice(6, 8).reverse().toString("hex"); + const data4 = bytes.slice(8, 10).toString("hex"); + const data5 = bytes.slice(10, 16).toString("hex"); + + return `${data1}-${data2}-${data3}-${data4}-${data5}`; + } /** * Creates a retry timer for a retryable task. * The timer will be associated with the retryable task so that when it fires, diff --git a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts index ed4f3f1..dc6578f 100644 --- a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts +++ b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts @@ -254,12 +254,15 @@ export class TaskHubGrpcWorker { try { const executor = new OrchestrationExecutor(this._registry); - const actions = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList()); + const result = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList()); res = new pb.OrchestratorResponse(); res.setInstanceid(req.getInstanceid()); res.setCompletiontoken(completionToken); - res.setActionsList(actions); + res.setActionsList(result.actions); + if (result.customStatus !== undefined) { + res.setCustomstatus(pbh.getStringValue(result.customStatus)); + } } catch (e: any) { console.error(e); console.log(`An error occurred while trying to execute instance '${req.getInstanceid()}': ${e.message}`); diff --git a/packages/durabletask-js/test/orchestration_context_methods.spec.ts b/packages/durabletask-js/test/orchestration_context_methods.spec.ts new file mode 100644 index 0000000..5c0beb8 --- /dev/null +++ b/packages/durabletask-js/test/orchestration_context_methods.spec.ts @@ -0,0 +1,306 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { CompleteOrchestrationAction, OrchestratorAction } from "../src/proto/orchestrator_service_pb"; +import { OrchestrationContext } from "../src/task/context/orchestration-context"; +import { + newExecutionStartedEvent, + newOrchestratorStartedEvent, +} from "../src/utils/pb-helper.util"; +import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/worker/orchestration-executor"; +import * as pb from "../src/proto/orchestrator_service_pb"; +import { Registry } from "../src/worker/registry"; +import { TOrchestrator } from "../src/types/orchestrator.type"; + +const TEST_INSTANCE_ID = "test-instance-abc123"; + +/** + * Helper to extract the CompleteOrchestrationAction from an OrchestrationExecutionResult + */ +function getAndValidateSingleCompleteOrchestrationAction( + result: OrchestrationExecutionResult, +): CompleteOrchestrationAction | undefined { + expect(result.actions.length).toEqual(1); + const action = result.actions[0]; + expect(action?.constructor?.name).toEqual(OrchestratorAction.name); + const resCompleteOrchestration = action.getCompleteorchestration(); + expect(resCompleteOrchestration).not.toBeNull(); + return resCompleteOrchestration; +} + +describe("OrchestrationContext.setCustomStatus", () => { + it("should set custom status as a string", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.setCustomStatus("my custom status"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED, + ); + expect(result.customStatus).toEqual('"my custom status"'); + }); + + it("should set custom status as an object", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.setCustomStatus({ step: 1, message: "processing" }); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED, + ); + expect(result.customStatus).toEqual(JSON.stringify({ step: 1, message: "processing" })); + }); + + it("should allow clearing custom status by setting it to undefined", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.setCustomStatus("initial"); + ctx.setCustomStatus(undefined); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + expect(result.customStatus).toBeUndefined(); + }); + + it("should update custom status multiple times", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.setCustomStatus("step1"); + ctx.setCustomStatus("step2"); + ctx.setCustomStatus("step3"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // The last set value should be returned + expect(result.customStatus).toEqual('"step3"'); + }); +}); + +describe("OrchestrationContext.sendEvent", () => { + it("should create a SendEvent action with event name and data", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.sendEvent("target-instance-id", "my-event", { data: "value" }); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // Should have 2 actions: sendEvent (fire-and-forget) + completeOrchestration + expect(result.actions.length).toEqual(2); + + // Find and verify the sendEvent action + const sendEventAction = result.actions.find((a) => a.hasSendevent()); + expect(sendEventAction).toBeDefined(); + const sendEvent = sendEventAction?.getSendevent(); + expect(sendEvent?.getInstance()?.getInstanceid()).toEqual("target-instance-id"); + expect(sendEvent?.getName()).toEqual("my-event"); + expect(sendEvent?.getData()?.getValue()).toEqual(JSON.stringify({ data: "value" })); + + // Verify the complete action is also present + const completeAction = result.actions.find((a) => a.hasCompleteorchestration()); + expect(completeAction).toBeDefined(); + }); + + it("should create a SendEvent action without data", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + ctx.sendEvent("target-instance-id", "signal-event"); + return "done"; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // Should have 2 actions: sendEvent (fire-and-forget) + completeOrchestration + expect(result.actions.length).toEqual(2); + + // Find and verify the sendEvent action + const sendEventAction = result.actions.find((a) => a.hasSendevent()); + expect(sendEventAction).toBeDefined(); + const sendEvent = sendEventAction?.getSendevent(); + expect(sendEvent?.getInstance()?.getInstanceid()).toEqual("target-instance-id"); + expect(sendEvent?.getName()).toEqual("signal-event"); + // No data should be set (or empty) + expect(sendEvent?.getData()?.getValue() ?? "").toEqual(""); + }); +}); + +describe("OrchestrationContext.newGuid", () => { + it("should generate a deterministic GUID", async () => { + let capturedGuid1: string | undefined; + let capturedGuid2: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedGuid1 = ctx.newGuid(); + capturedGuid2 = ctx.newGuid(); + return [capturedGuid1, capturedGuid2]; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(2024, 0, 15, 10, 30, 0, 0); // Fixed timestamp for determinism + const newEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); + expect(completeAction?.getOrchestrationstatus()).toEqual( + pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED, + ); + + // Verify GUIDs are in valid UUID format (8-4-4-4-12 hex chars) + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + expect(capturedGuid1).toMatch(uuidRegex); + expect(capturedGuid2).toMatch(uuidRegex); + + // Verify GUIDs are different from each other + expect(capturedGuid1).not.toEqual(capturedGuid2); + }); + + it("should generate the same GUIDs across replays with same inputs", async () => { + let firstRunGuids: string[] = []; + let secondRunGuids: string[] = []; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + const guids = [ctx.newGuid(), ctx.newGuid(), ctx.newGuid()]; + return guids; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(2024, 0, 15, 10, 30, 0, 0); // Fixed timestamp + const newEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + + // First run + const executor1 = new OrchestrationExecutor(registry); + const result1 = await executor1.execute(TEST_INSTANCE_ID, [], newEvents); + const output1 = result1.actions[0].getCompleteorchestration()?.getResult()?.getValue(); + firstRunGuids = JSON.parse(output1!); + + // Second run with same events (simulating replay) + const executor2 = new OrchestrationExecutor(registry); + const result2 = await executor2.execute(TEST_INSTANCE_ID, [], newEvents); + const output2 = result2.actions[0].getCompleteorchestration()?.getResult()?.getValue(); + secondRunGuids = JSON.parse(output2!); + + // GUIDs should be identical across runs + expect(firstRunGuids).toEqual(secondRunGuids); + expect(firstRunGuids.length).toEqual(3); + }); + + it("should generate different GUIDs for different instance IDs", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + return ctx.newGuid(); + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(2024, 0, 15, 10, 30, 0, 0); + + // Run with instance 1 + const events1 = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, "instance-1"), + ]; + const executor1 = new OrchestrationExecutor(registry); + const result1 = await executor1.execute("instance-1", [], events1); + const instance1Guid = JSON.parse( + result1.actions[0].getCompleteorchestration()?.getResult()?.getValue() ?? "", + ); + + // Run with instance 2 + const events2 = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, "instance-2"), + ]; + const executor2 = new OrchestrationExecutor(registry); + const result2 = await executor2.execute("instance-2", [], events2); + const instance2Guid = JSON.parse( + result2.actions[0].getCompleteorchestration()?.getResult()?.getValue() ?? "", + ); + + // GUIDs should be different for different instance IDs + expect(instance1Guid).not.toEqual(instance2Guid); + }); + + it("should generate GUIDs in the format of UUID v5", async () => { + let capturedGuid: string | undefined; + + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + capturedGuid = ctx.newGuid(); + return capturedGuid; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(2024, 0, 15, 10, 30, 0, 0); + const newEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID), + ]; + const executor = new OrchestrationExecutor(registry); + await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // UUID v5 has version byte at position 14 (should be '5') + // and variant bits at position 19 (should be '8', '9', 'a', or 'b') + expect(capturedGuid).toBeDefined(); + expect(capturedGuid![14]).toEqual("5"); // Version 5 + expect(["8", "9", "a", "b"]).toContain(capturedGuid![19].toLowerCase()); // Variant bits + }); +}); diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 1e840f0..611b2aa 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -19,7 +19,7 @@ import { newTimerCreatedEvent, newTimerFiredEvent, } from "../src/utils/pb-helper.util"; -import { OrchestrationExecutor } from "../src/worker/orchestration-executor"; +import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/worker/orchestration-executor"; import * as pb from "../src/proto/orchestrator_service_pb"; import { Registry } from "../src/worker/registry"; import { TOrchestrator } from "../src/types/orchestrator.type"; @@ -46,8 +46,8 @@ describe("Orchestration Executor", () => { newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(testInput)), ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); const expectedOutput = [testInput, TEST_INSTANCE_ID, startTime.toISOString(), false]; @@ -62,8 +62,8 @@ describe("Orchestration Executor", () => { const name = registry.addOrchestrator(emptyOrchestrator); const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); expect(completeAction?.getResult()?.getValue()).toEqual('"done"'); @@ -73,8 +73,8 @@ describe("Orchestration Executor", () => { const name = "Bogus"; const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("OrchestratorNotRegisteredError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).not.toBeNull(); @@ -95,12 +95,12 @@ describe("Orchestration Executor", () => { newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined), ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - expect(actions).not.toBeNull(); - expect(actions.length).toEqual(1); - expect(actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); - expect(actions[0]?.getId()).toEqual(1); - expect(actions[0]?.getCreatetimer()?.getFireat()?.toDate()).toEqual(expectedFireAt); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + expect(result.actions).not.toBeNull(); + expect(result.actions.length).toEqual(1); + expect(result.actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); + expect(result.actions[0]?.getId()).toEqual(1); + expect(result.actions[0]?.getCreatetimer()?.getFireat()?.toDate()).toEqual(expectedFireAt); }); it("should test the resumption of a task using a timerFired event", async () => { const delayOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { @@ -120,8 +120,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTimerFiredEvent(1, expectedFireAt)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); expect(completeAction?.getResult()?.getValue()).toEqual('"done"'); @@ -138,12 +138,12 @@ describe("Orchestration Executor", () => { const name = registry.addOrchestrator(orchestrator); const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - expect(actions).not.toBeNull(); - expect(actions.length).toEqual(1); - expect(actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); - expect(actions[0]?.getId()).toEqual(1); - expect(actions[0]?.getScheduletask()?.getName()).toEqual("dummyActivity"); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + expect(result.actions).not.toBeNull(); + expect(result.actions.length).toEqual(1); + expect(result.actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); + expect(result.actions[0]?.getId()).toEqual(1); + expect(result.actions[0]?.getScheduletask()?.getName()).toEqual("dummyActivity"); }); it("should test the successful completion of an activity task", async () => { const dummyActivity = async (_: ActivityContext) => { @@ -163,8 +163,8 @@ describe("Orchestration Executor", () => { const encodedOutput = JSON.stringify("done!"); const newEvents = [newTaskCompletedEvent(1, encodedOutput)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); console.log(completeAction?.getFailuredetails()); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -187,8 +187,8 @@ describe("Orchestration Executor", () => { const encodedOutput = JSON.stringify("done!"); const newEvents = [newTaskCompletedEvent(1, encodedOutput)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); console.log(completeAction?.getFailuredetails()); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -211,8 +211,8 @@ describe("Orchestration Executor", () => { const ex = new Error("Kah-BOOOOM!!!"); const newEvents = [newTaskFailedEvent(1, ex)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -241,8 +241,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTimerFiredEvent(1, fireAt)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -263,8 +263,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1, "done!")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -287,8 +287,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -312,8 +312,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -339,8 +339,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -363,8 +363,8 @@ describe("Orchestration Executor", () => { const ex = new Error("Kah-BOOOOM!!!"); const newEvents = [newSubOrchestrationFailedEvent(1, ex)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -387,8 +387,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -417,8 +417,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -444,17 +444,17 @@ describe("Orchestration Executor", () => { // Execute the orchestration until it is waiting for an external event. // The result should be an empty list of actions because the orchestration didn't schedule any work let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - expect(actions.length).toBe(0); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + expect(result.actions.length).toBe(0); // Now send an external event to the orchestration and execute it again. // This time the orcehstration should complete oldEvents = newEvents; newEvents = [newEventRaisedEvent("my_event", "42")]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -479,9 +479,9 @@ describe("Orchestration Executor", () => { // Execute the orchestration // It should be in a running state waiting for the timer to fire let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - expect(actions.length).toBe(1); - expect(actions[0].hasCreatetimer()).toBeTruthy(); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasCreatetimer()).toBeTruthy(); // Complete the timer task // The orchestration should move to the waitForExternalEvent step now which should @@ -491,9 +491,9 @@ describe("Orchestration Executor", () => { oldEvents = newEvents; newEvents = [newTimerFiredEvent(1, timerDueTime)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -515,16 +515,16 @@ describe("Orchestration Executor", () => { // It should be in a running state because it was suspended prior // to the processing the event raised event let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - expect(actions.length).toBe(0); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + expect(result.actions.length).toBe(0); // Resume the orchestration, it should complete successfully oldEvents.push(...newEvents); newEvents = [newResumeEvent()]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -545,11 +545,11 @@ describe("Orchestration Executor", () => { // Execute the orchestration // It should be in a running state waiting for an external event let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify("terminated!")); }); @@ -577,9 +577,9 @@ describe("Orchestration Executor", () => { // Execute the orchestration, it should be in a running state waiting for the timer to fire const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual( pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW, ); @@ -626,15 +626,15 @@ describe("Orchestration Executor", () => { ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); // The result should be 10 "taskScheduled" actions with inputs from 0 to 9 - expect(actions.length).toEqual(10); + expect(result.actions.length).toEqual(10); for (let i = 0; i < 10; i++) { - expect(actions[i].hasScheduletask()); - expect(actions[i].getScheduletask()?.getName()).toEqual(activityName); - expect(actions[i].getScheduletask()?.getInput()?.getValue()).toEqual(`"${i}"`); + expect(result.actions[i].hasScheduletask()); + expect(result.actions[i].getScheduletask()?.getName()).toEqual(activityName); + expect(result.actions[i].getScheduletask()?.getInput()?.getValue()).toEqual(`"${i}"`); } }); @@ -674,15 +674,15 @@ describe("Orchestration Executor", () => { // we expect the orchestrator to be running // it should however return 0 actions, since it is still waiting for the other 5 tasks to complete let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents.slice(0, 4)); - expect(actions.length).toBe(0); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents.slice(0, 4)); + expect(result.actions.length).toBe(0); // Now test with the full set of new events // we expect the orchestration to complete executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("[0,1,2,3,4,5,6,7,8,9]"); }); @@ -727,9 +727,9 @@ describe("Orchestration Executor", () => { // Now test with the full set of new events // We expect the orchestration to complete const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -762,11 +762,11 @@ describe("Orchestration Executor", () => { let oldEvents: any[] = []; let newEvents = [newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)]; let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - expect(actions.length).toEqual(2); - expect(actions[0].hasScheduletask()).toBeTruthy(); - expect(actions[1].hasScheduletask()).toBeTruthy(); + expect(result.actions.length).toEqual(2); + expect(result.actions[0].hasScheduletask()).toBeTruthy(); + expect(result.actions[1].hasScheduletask()).toBeTruthy(); // The next tests assume that the orchestration has already await at the task.whenAny oldEvents = [ @@ -781,8 +781,8 @@ describe("Orchestration Executor", () => { let encodedOutput = JSON.stringify(hello(null, "Tokyo")); newEvents = [newTaskCompletedEvent(1, encodedOutput)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - let completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -791,8 +791,8 @@ describe("Orchestration Executor", () => { encodedOutput = JSON.stringify(hello(null, "Seattle")); newEvents = [newTaskCompletedEvent(2, encodedOutput)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); }); @@ -822,12 +822,12 @@ describe("Orchestration Executor", () => { newOrchestratorStartedEvent(), newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), ]; - let actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); // Assert - Step 1: Should schedule activity - expect(actions.length).toBe(1); - expect(actions[0].hasScheduletask()).toBe(true); - expect(actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasScheduletask()).toBe(true); + expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); // Act - Step 2: Activity scheduled, then fails const oldEvents = [ @@ -838,11 +838,11 @@ describe("Orchestration Executor", () => { newEvents = [ newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), ]; - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); // Assert - Step 2: Should schedule a retry timer - expect(actions.length).toBe(1); - expect(actions[0].hasCreatetimer()).toBe(true); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasCreatetimer()).toBe(true); }); it("should complete successfully after retry timer fires and activity succeeds", async () => { @@ -869,44 +869,44 @@ describe("Orchestration Executor", () => { newOrchestratorStartedEvent(startTime), newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), ]; - let actions = await executor.execute(TEST_INSTANCE_ID, [], allEvents); - expect(actions.length).toBe(1); - expect(actions[0].hasScheduletask()).toBe(true); + let result = await executor.execute(TEST_INSTANCE_ID, [], allEvents); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasScheduletask()).toBe(true); // Step 2: Activity scheduled, then fails allEvents.push(newTaskScheduledEvent(1, "flakyActivity")); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), ]); - expect(actions.length).toBe(1); - expect(actions[0].hasCreatetimer()).toBe(true); - const timerFireAt = actions[0].getCreatetimer()?.getFireat()?.toDate(); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasCreatetimer()).toBe(true); + const timerFireAt = result.actions[0].getCreatetimer()?.getFireat()?.toDate(); expect(timerFireAt).toBeDefined(); // Step 3: Timer created, then fires allEvents.push(newTaskFailedEvent(1, new Error("Transient failure on attempt 1"))); allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTimerFiredEvent(2, timerFireAt!), ]); // Should reschedule the activity with a new ID - expect(actions.length).toBe(1); - expect(actions[0].hasScheduletask()).toBe(true); - expect(actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); - expect(actions[0].getId()).toBe(3); // New ID after timer + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasScheduletask()).toBe(true); + expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + expect(result.actions[0].getId()).toBe(3); // New ID after timer // Step 4: Retried activity scheduled, then completes allEvents.push(newTimerFiredEvent(2, timerFireAt!)); allEvents.push(newTaskScheduledEvent(3, "flakyActivity")); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTaskCompletedEvent(3, JSON.stringify(42)), ]); // Assert: Orchestration should complete successfully - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify(42)); }); @@ -935,50 +935,50 @@ describe("Orchestration Executor", () => { newOrchestratorStartedEvent(startTime), newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), ]; - let actions = await executor.execute(TEST_INSTANCE_ID, [], allEvents); - expect(actions.length).toBe(1); - expect(actions[0].hasScheduletask()).toBe(true); + let result = await executor.execute(TEST_INSTANCE_ID, [], allEvents); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasScheduletask()).toBe(true); // Step 2: Activity fails - first attempt allEvents.push(newTaskScheduledEvent(1, "alwaysFailsActivity")); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTaskFailedEvent(1, new Error("Failure on attempt 1")), ]); - expect(actions.length).toBe(1); - expect(actions[0].hasCreatetimer()).toBe(true); - const timerFireAt = actions[0].getCreatetimer()?.getFireat()?.toDate(); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasCreatetimer()).toBe(true); + const timerFireAt = result.actions[0].getCreatetimer()?.getFireat()?.toDate(); // Step 3: Timer fires, activity is rescheduled allEvents.push(newTaskFailedEvent(1, new Error("Failure on attempt 1"))); allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTimerFiredEvent(2, timerFireAt!), ]); - expect(actions.length).toBe(1); - expect(actions[0].hasScheduletask()).toBe(true); + expect(result.actions.length).toBe(1); + expect(result.actions[0].hasScheduletask()).toBe(true); // Step 4: Second activity attempt fails - max attempts reached allEvents.push(newTimerFiredEvent(2, timerFireAt!)); allEvents.push(newTaskScheduledEvent(3, "alwaysFailsActivity")); executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + result = await executor.execute(TEST_INSTANCE_ID, allEvents, [ newTaskFailedEvent(3, new Error("Failure on attempt 2")), ]); // Assert: Orchestration should fail - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); }); }); }); function getAndValidateSingleCompleteOrchestrationAction( - actions: OrchestratorAction[], + result: OrchestrationExecutionResult, ): CompleteOrchestrationAction | undefined { - expect(actions.length).toEqual(1); - const action = actions[0]; + expect(result.actions.length).toEqual(1); + const action = result.actions[0]; expect(action?.constructor?.name).toEqual(CompleteOrchestrationAction.name); const resCompleteOrchestration = action.getCompleteorchestration(); diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index a831ea6..9e59a73 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -2,12 +2,11 @@ // Licensed under the MIT License. /** - * E2E tests for Durable Task Scheduler (DTS) emulator. + * E2E tests for Durable Task Scheduler (DTS). * - * NOTE: These tests assume the DTS emulator is running. Example command: - * docker run -i -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest - * - * Environment variables: + * Environment variables (choose one): + * - DTS_CONNECTION_STRING: Full connection string (e.g., "Endpoint=https://...;Authentication=DefaultAzure;TaskHub=...") + * OR * - ENDPOINT: The endpoint for the DTS emulator (default: localhost:8080) * - TASKHUB: The task hub name (default: default) */ @@ -30,22 +29,35 @@ import { } from "@microsoft/durabletask-js-azuremanaged"; // Read environment variables +const connectionString = process.env.DTS_CONNECTION_STRING; const endpoint = process.env.ENDPOINT || "localhost:8080"; const taskHub = process.env.TASKHUB || "default"; +function createClient(): TaskHubGrpcClient { + if (connectionString) { + return new DurableTaskAzureManagedClientBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedClientBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + +function createWorker(): TaskHubGrpcWorker { + if (connectionString) { + return new DurableTaskAzureManagedWorkerBuilder().connectionString(connectionString).build(); + } + return new DurableTaskAzureManagedWorkerBuilder() + .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) + .build(); +} + describe("Durable Task Scheduler (DTS) E2E Tests", () => { let taskHubClient: TaskHubGrpcClient; let taskHubWorker: TaskHubGrpcWorker; beforeEach(async () => { - // Create client and worker using the Azure-managed builders with taskhub metadata - taskHubClient = new DurableTaskAzureManagedClientBuilder() - .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) - .build(); - - taskHubWorker = new DurableTaskAzureManagedWorkerBuilder() - .endpoint(endpoint, taskHub, null) // null credential for emulator (no auth) - .build(); + taskHubClient = createClient(); + taskHubWorker = createWorker(); }); afterEach(async () => { @@ -493,4 +505,220 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(state?.serializedOutput).toEqual(JSON.stringify("Hello, World!")); expect(invoked).toBe(true); }, 31000); + + // // ==================== newGuid Tests ==================== + + it("should generate deterministic GUIDs with newGuid", async () => { + const orchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + const guid1 = ctx.newGuid(); + const guid2 = ctx.newGuid(); + const guid3 = ctx.newGuid(); + return { guid1, guid2, guid3 }; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + const output = JSON.parse(state?.serializedOutput ?? "{}"); + + // Verify GUIDs are in valid format (8-4-4-4-12 hex chars) + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-5[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + expect(output.guid1).toMatch(uuidRegex); + expect(output.guid2).toMatch(uuidRegex); + expect(output.guid3).toMatch(uuidRegex); + + // Verify all GUIDs are unique + expect(output.guid1).not.toEqual(output.guid2); + expect(output.guid2).not.toEqual(output.guid3); + expect(output.guid1).not.toEqual(output.guid3); + }, 31000); + + it("should generate consistent GUIDs across replays", async () => { + // This test verifies that newGuid produces the same values across replays + // by running an orchestration that generates GUIDs, waits, and then returns them + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Generate GUIDs before and after a timer + const guid1 = ctx.newGuid(); + const guid2 = ctx.newGuid(); + + yield ctx.createTimer(1); + + // Generate more GUIDs after replay + const guid3 = ctx.newGuid(); + const guid4 = ctx.newGuid(); + + // Return all GUIDs - if deterministic, guid3/guid4 should be different from guid1/guid2 + // but consistent across replays (which we verify by the orchestration completing successfully) + return { guid1, guid2, guid3, guid4 }; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + const output = JSON.parse(state?.serializedOutput ?? "{}"); + + // Verify all 4 GUIDs are unique + const guids = [output.guid1, output.guid2, output.guid3, output.guid4]; + const uniqueGuids = new Set(guids); + expect(uniqueGuids.size).toBe(4); + + // Verify all GUIDs are valid UUID v5 format + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-5[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + guids.forEach((guid) => expect(guid).toMatch(uuidRegex)); + }, 31000); + + // // ==================== setCustomStatus Tests ==================== + + it("should set and retrieve custom status", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.setCustomStatus("Processing"); + yield ctx.createTimer(10); + ctx.setCustomStatus("Completed"); + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + + // Poll to observe the first status + let foundProcessingStatus = false; + const startTime = Date.now(); + while (Date.now() - startTime < 10000) { + const state = await taskHubClient.getOrchestrationState(id); + if (state?.serializedCustomStatus === JSON.stringify("Processing")) { + foundProcessingStatus = true; + break; + } + await new Promise((resolve) => setTimeout(resolve, 200)); + } + + const finalState = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(finalState).toBeDefined(); + expect(finalState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(finalState?.serializedCustomStatus).toEqual(JSON.stringify("Completed")); + expect(foundProcessingStatus).toBe(true); + }, 31000); + + it("should update custom status to empty string when explicitly set", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + ctx.setCustomStatus("Initial status"); + yield ctx.createTimer(1); + + // Update custom status to an empty string; this is a valid value and does not clear it. + ctx.setCustomStatus(""); + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // When set to empty string, the serialized value should be '""' (JSON-encoded empty string) + expect(state?.serializedCustomStatus).toEqual('""'); + }, 31000); + + // ==================== sendEvent Tests ==================== + + it("should send event from one orchestration to another", async () => { + const receiverOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const event = yield ctx.waitForExternalEvent("greeting"); + return event; + }; + + const senderOrchestrator: TOrchestrator = async function* ( + ctx: OrchestrationContext, + targetInstanceId: string, + ): any { + // Wait a bit to ensure receiver is running + yield ctx.createTimer(1); + + // Send event to the receiver + ctx.sendEvent(targetInstanceId, "greeting", { message: "Hello from sender!" }); + + // Must yield after sendEvent to ensure the action is sent before orchestration completes + yield ctx.createTimer(1); + + return "sent"; + }; + + taskHubWorker.addOrchestrator(receiverOrchestrator); + taskHubWorker.addOrchestrator(senderOrchestrator); + await taskHubWorker.start(); + + // Start receiver first + const receiverId = await taskHubClient.scheduleNewOrchestration(receiverOrchestrator); + await taskHubClient.waitForOrchestrationStart(receiverId, undefined, 10); + + // Start sender with receiver's instance ID + const senderId = await taskHubClient.scheduleNewOrchestration(senderOrchestrator, receiverId); + + // Wait for both to complete + const [receiverState, senderState] = await Promise.all([ + taskHubClient.waitForOrchestrationCompletion(receiverId, undefined, 30), + taskHubClient.waitForOrchestrationCompletion(senderId, undefined, 30), + ]); + + expect(senderState).toBeDefined(); + expect(senderState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(senderState?.serializedOutput).toEqual(JSON.stringify("sent")); + + expect(receiverState).toBeDefined(); + expect(receiverState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(receiverState?.serializedOutput).toEqual(JSON.stringify({ message: "Hello from sender!" })); + }, 45000); + + it("should send event without data", async () => { + const receiverOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("signal"); + return "received signal"; + }; + + const senderOrchestrator: TOrchestrator = async function* ( + ctx: OrchestrationContext, + targetInstanceId: string, + ): any { + yield ctx.createTimer(1); + ctx.sendEvent(targetInstanceId, "signal"); + // Must yield after sendEvent to ensure the action is sent + yield ctx.createTimer(1); + return "signaled"; + }; + + taskHubWorker.addOrchestrator(receiverOrchestrator); + taskHubWorker.addOrchestrator(senderOrchestrator); + await taskHubWorker.start(); + + const receiverId = await taskHubClient.scheduleNewOrchestration(receiverOrchestrator); + await taskHubClient.waitForOrchestrationStart(receiverId, undefined, 10); + + const senderId = await taskHubClient.scheduleNewOrchestration(senderOrchestrator, receiverId); + + const [receiverState, senderState] = await Promise.all([ + taskHubClient.waitForOrchestrationCompletion(receiverId, undefined, 30), + taskHubClient.waitForOrchestrationCompletion(senderId, undefined, 30), + ]); + + expect(senderState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(receiverState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(receiverState?.serializedOutput).toEqual(JSON.stringify("received signal")); + }, 45000); }); diff --git a/test/e2e-azuremanaged/rewind.spec.ts b/test/e2e-azuremanaged/rewind.spec.ts index 91f2ac8..f49e1dc 100644 --- a/test/e2e-azuremanaged/rewind.spec.ts +++ b/test/e2e-azuremanaged/rewind.spec.ts @@ -168,7 +168,7 @@ describe("Rewind Instance E2E Tests", () => { // No need to start worker for this test // Will throw either "not found" or "not supported" depending on backend await expect(taskHubClient.rewindInstance(nonExistentId, "Test rewind")).rejects.toThrow(); - }); + }, 60000); it("should throw an error when rewinding a completed orchestration (or if rewind is not supported)", async () => { const instanceId = generateUniqueInstanceId("rewind-completed"); @@ -183,7 +183,7 @@ describe("Rewind Instance E2E Tests", () => { // Try to rewind a completed orchestration - should fail await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow(); - }); + }, 60000); it.skip("should throw an error when rewinding a running orchestration (requires backend support)", async () => { const instanceId = generateUniqueInstanceId("rewind-running");