Skip to content
Merged
37 changes: 37 additions & 0 deletions packages/durabletask-js/src/task/context/orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,41 @@ export abstract class OrchestrationContext {
* @param saveEvents {boolean} A flag indicating whether to add any unprocessed external events in the new orchestration history.
*/
abstract continueAsNew(newInput: any, saveEvents: boolean): void;

/**
* Sets a custom status value for the current orchestration instance.
*
* The custom status value is serialized and stored in orchestration state and will
* be made available to the orchestration status query APIs. The serialized value
* must not exceed 16 KB of UTF-16 encoded text.
*
* @param {any} customStatus A JSON-serializable value to assign as the custom status, or `null`/`undefined` to clear it.
*/
abstract setCustomStatus(customStatus: any): void;

/**
* Sends an event to another orchestration instance.
*
* The target orchestration can handle the sent event using the `waitForExternalEvent()` method.
* If the target orchestration doesn't exist or has completed, the event will be silently dropped.
*
* @param {string} instanceId The ID of the orchestration instance to send the event to.
* @param {string} eventName The name of the event. Event names are case-insensitive.
* @param {any} eventData The JSON-serializable payload of the event.
*/
abstract sendEvent(instanceId: string, eventName: string, eventData?: any): void;

/**
* Creates a new UUID that is safe for replay within an orchestration.
*
* This method generates a deterministic UUID v5 using the algorithm from RFC 4122 §4.3.
* The name input used to generate this value is a combination of the orchestration instance ID,
* the current UTC datetime, and an internally managed sequence counter.
*
* Use this method instead of random UUID generators (like `crypto.randomUUID()`) to ensure
* deterministic execution during orchestration replay.
*
* @returns {string} A new deterministic UUID string.
*/
abstract newGuid(): string;
}
21 changes: 21 additions & 0 deletions packages/durabletask-js/src/utils/pb-helper.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,27 @@ export function newCreateSubOrchestrationAction(
return action;
}

export function newSendEventAction(
id: number,
instanceId: string,
eventName: string,
encodedData?: string,
): pb.OrchestratorAction {
const orchestrationInstance = new pb.OrchestrationInstance();
orchestrationInstance.setInstanceid(instanceId);

const sendEventAction = new pb.SendEventAction();
sendEventAction.setInstance(orchestrationInstance);
sendEventAction.setName(eventName);
sendEventAction.setData(getStringValue(encodedData));

const action = new pb.OrchestratorAction();
action.setId(id);
action.setSendevent(sendEventAction);

return action;
}

export function isEmpty(v?: StringValue | null): boolean {
return v == null || v.getValue() === "";
}
Expand Down
17 changes: 14 additions & 3 deletions packages/durabletask-js/src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import { StopIterationError } from "./exception/stop-iteration-error";
import { Registry } from "./registry";
import { RuntimeOrchestrationContext } from "./runtime-orchestration-context";

/**
* Result of orchestration execution containing actions and optional custom status.
*/
export interface OrchestrationExecutionResult {
actions: pb.OrchestratorAction[];
customStatus?: string;
}

export class OrchestrationExecutor {
_generator?: TOrchestrator;
_registry: Registry;
Expand All @@ -38,7 +46,7 @@ export class OrchestrationExecutor {
instanceId: string,
oldEvents: pb.HistoryEvent[],
newEvents: pb.HistoryEvent[],
): Promise<pb.OrchestratorAction[]> {
): Promise<OrchestrationExecutionResult> {
if (!newEvents?.length) {
throw new OrchestrationStateError("The new history event list must have at least one event in it");
}
Expand Down Expand Up @@ -81,7 +89,10 @@ export class OrchestrationExecutor {
const actions = ctx.getActions();
console.log(`${instanceId}: Returning ${actions.length} action(s)`);

return actions;
return {
actions,
customStatus: ctx.getCustomStatus(),
};
}

private async processEvent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
Expand All @@ -101,7 +112,7 @@ export class OrchestrationExecutor {
try {
switch (eventType) {
case pb.HistoryEvent.EventtypeCase.ORCHESTRATORSTARTED:
ctx._currentUtcDatetime = event.getTimestamp()?.toDate();
ctx._currentUtcDatetime = event.getTimestamp()?.toDate() ?? ctx._currentUtcDatetime;
break;
case pb.HistoryEvent.EventtypeCase.EXECUTIONSTARTED:
{
Expand Down
126 changes: 121 additions & 5 deletions packages/durabletask-js/src/worker/runtime-orchestration-context.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { createHash } from "crypto";
import { getName } from "../task";
import { OrchestrationContext } from "../task/context/orchestration-context";
import * as pb from "../proto/orchestrator_service_pb";
Expand All @@ -22,14 +23,16 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
_result: any;
_pendingActions: Record<number, pb.OrchestratorAction>;
_pendingTasks: Record<number, CompletableTask<any>>;
_sequenceNumber: any;
_currentUtcDatetime: any;
_sequenceNumber: number;
_newGuidCounter: number;
_currentUtcDatetime: Date;
_instanceId: string;
_completionStatus?: pb.OrchestrationStatus;
_receivedEvents: Record<string, any[]>;
_pendingEvents: Record<string, CompletableTask<any>[]>;
_newInput?: any;
_saveEvents: any;
_saveEvents: boolean;
_customStatus?: any;

constructor(instanceId: string) {
super();
Expand All @@ -41,13 +44,15 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
this._pendingActions = {};
this._pendingTasks = {};
this._sequenceNumber = 0;
this._newGuidCounter = 0;
this._currentUtcDatetime = new Date(1000, 0, 1);
this._instanceId = instanceId;
this._completionStatus = undefined;
this._receivedEvents = {};
this._pendingEvents = {};
this._newInput = undefined;
this._saveEvents = false;
this._customStatus = undefined;
}

get instanceId(): string {
Expand Down Expand Up @@ -141,7 +146,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {

this._isComplete = true;
this._completionStatus = status;
this._pendingActions = {}; // Clear any pending actions
// Note: Do NOT clear pending actions here - fire-and-forget actions like sendEvent
// must be preserved and returned alongside the complete action

this._result = result;

Expand All @@ -163,7 +169,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {

this._isComplete = true;
this._completionStatus = pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED;
this._pendingActions = {}; // Cancel any pending actions
// Note: Do NOT clear pending actions here - fire-and-forget actions like sendEvent
// must be preserved and returned alongside the complete action

const action = ph.newCompleteOrchestrationAction(
this.nextSequenceNumber(),
Expand Down Expand Up @@ -362,6 +369,115 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
this.setContinuedAsNew(newInput, saveEvents);
}

/**
* Sets a custom status value for the current orchestration instance.
*/
setCustomStatus(customStatus: any): void {
this._customStatus = customStatus;
}

/**
* Gets the encoded custom status value for the current orchestration instance.
* This is used internally when building the orchestrator response.
*/
getCustomStatus(): string | undefined {
if (this._customStatus === undefined || this._customStatus === null) {
return undefined;
}
return JSON.stringify(this._customStatus);
}

/**
* Sends an event to another orchestration instance.
*/
sendEvent(instanceId: string, eventName: string, eventData?: any): void {
const id = this.nextSequenceNumber();
const encodedData = eventData !== undefined ? JSON.stringify(eventData) : undefined;
const action = ph.newSendEventAction(id, instanceId, eventName, encodedData);
this._pendingActions[action.getId()] = action;
}

/**
* Creates a new deterministic UUID that is safe for replay within an orchestration.
*
* Uses UUID v5 (name-based with SHA-1) per RFC 4122 §4.3.
* The generated GUID is deterministic based on instanceId, currentUtcDateTime, and a counter,
* ensuring the same value is produced during replay.
*/
newGuid(): string {
const NAMESPACE_UUID = "9e952958-5e33-4daf-827f-2fa12937b875";

// Build the name string: instanceId_datetime_counter
const guidNameValue = `${this._instanceId}_${this._currentUtcDatetime.toISOString()}_${this._newGuidCounter}`;
this._newGuidCounter++;

return this.generateDeterministicGuid(NAMESPACE_UUID, guidNameValue);
}

/**
* Generates a deterministic GUID using UUID v5 algorithm.
* The output format is compatible with other Durable Task SDKs.
*/
private generateDeterministicGuid(namespace: string, name: string): string {
// Parse namespace UUID string to bytes (big-endian/network order)
const namespaceBytes = this.parseUuidToBytes(namespace);

// Convert name to UTF-8 bytes
const nameBytes = Buffer.from(name, "utf-8");

// Compute SHA-1 hash of namespace + name
const hash = createHash("sha1");
hash.update(namespaceBytes);
hash.update(nameBytes);
const hashBytes = hash.digest();

// Take first 16 bytes of hash
const guidBytes = Buffer.alloc(16);
hashBytes.copy(guidBytes, 0, 0, 16);

// Set version to 5 (UUID v5)
guidBytes[6] = (guidBytes[6] & 0x0f) | 0x50;

// Set variant to RFC 4122
guidBytes[8] = (guidBytes[8] & 0x3f) | 0x80;

// Convert to GUID byte order for formatting
this.swapGuidBytes(guidBytes);

return this.formatGuidBytes(guidBytes);
}

/**
* Swaps bytes to convert between UUID (big-endian) and GUID (mixed-endian) byte order.
* GUIDs store the first 3 components (Data1, Data2, Data3) in little-endian format.
*/
private swapGuidBytes(bytes: Buffer): void {
[bytes[0], bytes[3]] = [bytes[3], bytes[0]];
[bytes[1], bytes[2]] = [bytes[2], bytes[1]];
[bytes[4], bytes[5]] = [bytes[5], bytes[4]];
[bytes[6], bytes[7]] = [bytes[7], bytes[6]];
}

/**
* Parses a UUID string to a byte buffer in big-endian (network) order.
*/
private parseUuidToBytes(uuid: string): Buffer {
const hex = uuid.replace(/-/g, "");
return Buffer.from(hex, "hex");
}

/**
* Formats a GUID byte buffer as a string in standard GUID format (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
*/
private formatGuidBytes(bytes: Buffer): string {
const data1 = bytes.slice(0, 4).reverse().toString("hex");
const data2 = bytes.slice(4, 6).reverse().toString("hex");
const data3 = bytes.slice(6, 8).reverse().toString("hex");
const data4 = bytes.slice(8, 10).toString("hex");
const data5 = bytes.slice(10, 16).toString("hex");

return `${data1}-${data2}-${data3}-${data4}-${data5}`;
}
/**
* Creates a retry timer for a retryable task.
* The timer will be associated with the retryable task so that when it fires,
Expand Down
7 changes: 5 additions & 2 deletions packages/durabletask-js/src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,15 @@ export class TaskHubGrpcWorker {

try {
const executor = new OrchestrationExecutor(this._registry);
const actions = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());
const result = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());

res = new pb.OrchestratorResponse();
res.setInstanceid(req.getInstanceid());
res.setCompletiontoken(completionToken);
res.setActionsList(actions);
res.setActionsList(result.actions);
if (result.customStatus !== undefined) {
res.setCustomstatus(pbh.getStringValue(result.customStatus));
}
} catch (e: any) {
console.error(e);
console.log(`An error occurred while trying to execute instance '${req.getInstanceid()}': ${e.message}`);
Expand Down
Loading
Loading