From 1655fae9b280f144ca7f423141dfe549e1dc477f Mon Sep 17 00:00:00 2001 From: torosent <17064840+torosent@users.noreply.github.com> Date: Tue, 3 Feb 2026 12:48:36 -0800 Subject: [PATCH 1/6] Add support for orchestration tags in TaskHubGrpcClient and related components --- packages/durabletask-js/src/client/client.ts | 61 ++++++++++++++++++- packages/durabletask-js/src/index.ts | 8 ++- .../durabletask-js/src/orchestration/index.ts | 18 ++++++ .../src/orchestration/orchestration-state.ts | 3 + .../durabletask-js/src/task/options/index.ts | 1 + .../src/task/options/task-options.ts | 24 ++++++++ .../src/utils/pb-helper.util.ts | 23 ++++++- .../worker/runtime-orchestration-context.ts | 25 ++++++-- .../test/orchestration_executor.spec.ts | 39 ++++++++++++ .../durabletask-js/test/query-apis.spec.ts | 18 ++++++ test/e2e-azuremanaged/orchestration.spec.ts | 17 ++++++ tests/e2e/orchestration.spec.ts | 1 + 12 files changed, 231 insertions(+), 7 deletions(-) diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index 572ddcf..d359baf 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -22,10 +22,26 @@ import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from ". import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page"; import { FailureDetails } from "../task/failure-details"; import { Logger, ConsoleLogger } from "../types/logger.type"; +import { StartOrchestrationOptions } from "../task/options"; // Re-export MetadataGenerator for backward compatibility export { MetadataGenerator } from "../utils/grpc-helper.util"; +function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): + | Record + | undefined { + if (!tagsMap) { + return; + } + + const tags: Record = {}; + tagsMap.forEach((value, key) => { + tags[key] = value; + }); + + return Object.keys(tags).length > 0 ? tags : undefined; +} + /** * Options for creating a TaskHubGrpcClient. */ @@ -133,6 +149,25 @@ export class TaskHubGrpcClient { input?: TInput, instanceId?: string, startAt?: Date, + ): Promise; + /** + * Schedules a new orchestrator using the DurableTask client. + * + * @param {TOrchestrator | string} orchestrator - The orchestrator or the name of the orchestrator to be scheduled. + * @param {TInput} input - Optional input for the orchestrator. + * @param {StartOrchestrationOptions} options - Options for instance ID, start time, and tags. + * @return {Promise} A Promise resolving to the unique ID of the scheduled orchestrator instance. + */ + async scheduleNewOrchestration( + orchestrator: TOrchestrator | string, + input?: TInput, + options?: StartOrchestrationOptions, + ): Promise; + async scheduleNewOrchestration( + orchestrator: TOrchestrator | string, + input?: TInput, + instanceIdOrOptions?: string | StartOrchestrationOptions, + startAt?: Date, ): Promise { let name; if (typeof orchestrator === "string") { @@ -140,6 +175,20 @@ export class TaskHubGrpcClient { } else { name = getName(orchestrator); } + + const instanceId = + typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined + ? instanceIdOrOptions + : instanceIdOrOptions.instanceId; + const scheduledStartAt = + typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined + ? startAt + : instanceIdOrOptions.startAt; + const tags = + typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined + ? undefined + : instanceIdOrOptions.tags; + const req = new pb.CreateInstanceRequest(); req.setName(name); req.setInstanceid(instanceId ?? randomUUID()); @@ -148,11 +197,18 @@ export class TaskHubGrpcClient { i.setValue(JSON.stringify(input)); const ts = new Timestamp(); - ts.fromDate(new Date(startAt?.getTime() ?? 0)); + ts.fromDate(new Date(scheduledStartAt?.getTime() ?? 0)); req.setInput(i); req.setScheduledstarttimestamp(ts); + if (tags) { + const tagsMap = req.getTagsMap(); + for (const [key, value] of Object.entries(tags)) { + tagsMap.set(key, value); + } + } + this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`); const res = await callWithMetadata( @@ -779,6 +835,8 @@ export class TaskHubGrpcClient { ); } + const tags = mapToRecord(protoState.getTagsMap()); + return new OrchestrationState( instanceId, name ?? "", @@ -789,6 +847,7 @@ export class TaskHubGrpcClient { serializedOutput, serializedCustomStatus, failureDetails, + tags, ); } } diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index 63337a6..6a604e0 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -27,7 +27,13 @@ export { Task } from "./task/task"; // Retry policies and task options export { RetryPolicy, RetryPolicyOptions } from "./task/retry"; -export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options"; +export { + TaskOptions, + SubOrchestrationOptions, + StartOrchestrationOptions, + taskOptionsFromRetryPolicy, + subOrchestrationOptionsFromRetryPolicy, +} from "./task/options"; // Types export { TOrchestrator } from "./types/orchestrator.type"; diff --git a/packages/durabletask-js/src/orchestration/index.ts b/packages/durabletask-js/src/orchestration/index.ts index 76e104f..4dedb5f 100644 --- a/packages/durabletask-js/src/orchestration/index.ts +++ b/packages/durabletask-js/src/orchestration/index.ts @@ -6,6 +6,21 @@ import { FailureDetails } from "../task/failure-details"; import { fromProtobuf } from "./enum/orchestration-status.enum"; import { OrchestrationState } from "./orchestration-state"; +function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): + | Record + | undefined { + if (!tagsMap) { + return; + } + + const tags: Record = {}; + tagsMap.forEach((value, key) => { + tags[key] = value; + }); + + return Object.keys(tags).length > 0 ? tags : undefined; +} + export function newOrchestrationState( instanceId: string, res?: pb.GetInstanceResponse, @@ -43,6 +58,8 @@ export function newOrchestrationState( tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000); } + const tags = mapToRecord(state?.getTagsMap()); + return new OrchestrationState( instanceId, state?.getName() ?? "", @@ -53,5 +70,6 @@ export function newOrchestrationState( state?.getOutput()?.toString(), state?.getCustomstatus()?.toString(), failureDetails, + tags, ); } diff --git a/packages/durabletask-js/src/orchestration/orchestration-state.ts b/packages/durabletask-js/src/orchestration/orchestration-state.ts index 74192de..2456932 100644 --- a/packages/durabletask-js/src/orchestration/orchestration-state.ts +++ b/packages/durabletask-js/src/orchestration/orchestration-state.ts @@ -15,6 +15,7 @@ export class OrchestrationState { serializedOutput?: string; serializedCustomStatus?: string; failureDetails?: FailureDetails; + tags?: Record; constructor( instanceId: string, @@ -26,6 +27,7 @@ export class OrchestrationState { serializedOutput?: string, serializedCustomStatus?: string, failureDetails?: FailureDetails, + tags?: Record, ) { this.instanceId = instanceId; this.name = name; @@ -36,6 +38,7 @@ export class OrchestrationState { this.serializedOutput = serializedOutput; this.serializedCustomStatus = serializedCustomStatus; this.failureDetails = failureDetails; + this.tags = tags; } raiseIfFailed(): void { diff --git a/packages/durabletask-js/src/task/options/index.ts b/packages/durabletask-js/src/task/options/index.ts index 73e5f24..bdcc4ab 100644 --- a/packages/durabletask-js/src/task/options/index.ts +++ b/packages/durabletask-js/src/task/options/index.ts @@ -4,6 +4,7 @@ export { TaskOptions, SubOrchestrationOptions, + StartOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy, } from "./task-options"; diff --git a/packages/durabletask-js/src/task/options/task-options.ts b/packages/durabletask-js/src/task/options/task-options.ts index 37a3ed7..0a66dd5 100644 --- a/packages/durabletask-js/src/task/options/task-options.ts +++ b/packages/durabletask-js/src/task/options/task-options.ts @@ -12,6 +12,10 @@ export interface TaskOptions { * Controls how many times a task is retried and the delay between retries. */ retry?: RetryPolicy; + /** + * The tags to associate with the task. + */ + tags?: Record; } /** @@ -26,6 +30,26 @@ export interface SubOrchestrationOptions extends TaskOptions { instanceId?: string; } +/** + * Options for scheduling new orchestrations via the client. + */ +export interface StartOrchestrationOptions { + /** + * The unique ID to use for the orchestration instance. + * If not specified, a random UUID will be generated. + */ + instanceId?: string; + /** + * The time when the orchestration should start executing. + * If not specified or in the past, it will start immediately. + */ + startAt?: Date; + /** + * The tags to associate with the orchestration instance. + */ + tags?: Record; +} + /** * Creates a TaskOptions instance from a RetryPolicy. * diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index 38928c0..1c305b3 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -243,6 +243,19 @@ export function getStringValue(val?: string): StringValue | undefined { return stringValue; } +function setTagsMap( + tagsMap: { set: (key: string, value: string) => void }, + tags?: Record, +): void { + if (!tags) { + return; + } + + for (const [key, value] of Object.entries(tags)) { + tagsMap.set(key, value); + } +} + export function newCompleteOrchestrationAction( id: number, status: pb.OrchestrationStatus, @@ -277,10 +290,16 @@ export function newCreateTimerAction(id: number, fireAt: Date): pb.OrchestratorA return action; } -export function newScheduleTaskAction(id: number, name: string, encodedInput?: string): pb.OrchestratorAction { +export function newScheduleTaskAction( + id: number, + name: string, + encodedInput?: string, + tags?: Record, +): pb.OrchestratorAction { const scheduleTaskAction = new pb.ScheduleTaskAction(); scheduleTaskAction.setName(name); scheduleTaskAction.setInput(getStringValue(encodedInput)); + setTagsMap(scheduleTaskAction.getTagsMap(), tags); const action = new pb.OrchestratorAction(); action.setId(id); @@ -300,11 +319,13 @@ export function newCreateSubOrchestrationAction( name: string, instanceId?: string | null, encodedInput?: string, + tags?: Record, ): pb.OrchestratorAction { const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction(); createSubOrchestrationAction.setName(name); createSubOrchestrationAction.setInstanceid(instanceId || ""); createSubOrchestrationAction.setInput(getStringValue(encodedInput)); + setTagsMap(createSubOrchestrationAction.getTagsMap(), tags); const action = new pb.OrchestratorAction(); action.setId(id); diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index 7af2435..43df358 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -15,6 +15,21 @@ import { TOrchestrator } from "../types/orchestrator.type"; import { Task } from "../task/task"; import { StopIterationError } from "./exception/stop-iteration-error"; +function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): + | Record + | undefined { + if (!tagsMap) { + return; + } + + const tags: Record = {}; + tagsMap.forEach((value, key) => { + tags[key] = value; + }); + + return Object.keys(tags).length > 0 ? tags : undefined; +} + export class RuntimeOrchestrationContext extends OrchestrationContext { _generator?: Generator, any, any>; _previousTask?: Task; @@ -262,7 +277,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const id = this.nextSequenceNumber(); const name = typeof activity === "string" ? activity : getName(activity); const encodedInput = input ? JSON.stringify(input) : undefined; - const action = ph.newScheduleTaskAction(id, name, encodedInput); + const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags); this._pendingActions[action.getId()] = action; // If a retry policy is provided, create a RetryableTask @@ -306,7 +321,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } const encodedInput = input ? JSON.stringify(input) : undefined; - const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput); + const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags); this._pendingActions[action.getId()] = action; // If a retry policy is provided, create a RetryableTask @@ -520,7 +535,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } const name = scheduleTask.getName(); const input = scheduleTask.getInput()?.getValue(); - newAction = ph.newScheduleTaskAction(newId, name, input); + const tags = mapToRecord(scheduleTask.getTagsMap()); + newAction = ph.newScheduleTaskAction(newId, name, input, tags); } else { // Reschedule a sub-orchestration task const subOrch = originalAction.getCreatesuborchestration(); @@ -530,7 +546,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const name = subOrch.getName(); const instanceId = subOrch.getInstanceid(); const input = subOrch.getInput()?.getValue(); - newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input); + const tags = mapToRecord(subOrch.getTagsMap()); + newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags); } // Register the new action diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 65ac518..d610af0 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -149,6 +149,25 @@ describe("Orchestration Executor", () => { expect(result.actions[0]?.getId()).toEqual(1); expect(result.actions[0]?.getScheduletask()?.getName()).toEqual("dummyActivity"); }); + + it("should include tags on scheduled activity actions", async () => { + const dummyActivity = async (_: ActivityContext) => { + // do nothing + }; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any) { + yield ctx.callActivity(dummyActivity, undefined, { tags: { env: "test", owner: "durable" } }); + return "done"; + }; + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; + const executor = new OrchestrationExecutor(registry, testLogger); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const scheduleTask = result.actions[0]?.getScheduletask(); + + expect(scheduleTask?.getTagsMap().get("env")).toEqual("test"); + expect(scheduleTask?.getTagsMap().get("owner")).toEqual("durable"); + }); it("should test the successful completion of an activity task", async () => { const dummyActivity = async (_: ActivityContext) => { // do nothing @@ -348,6 +367,26 @@ describe("Orchestration Executor", () => { expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); + + it("should include tags on scheduled sub-orchestration actions", async () => { + const subOrchestrator = async (_: OrchestrationContext) => { + // do nothing + }; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + yield ctx.callSubOrchestrator(subOrchestrator, undefined, { tags: { env: "test" } }); + return "done"; + }; + const registry = new Registry(); + const subOrchestratorName = registry.addOrchestrator(subOrchestrator); + const orchestratorName = registry.addOrchestrator(orchestrator); + const newEvents = [newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID, undefined)]; + const executor = new OrchestrationExecutor(registry, testLogger); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const createSubOrch = result.actions[0]?.getCreatesuborchestration(); + + expect(createSubOrch?.getName()).toEqual(subOrchestratorName); + expect(createSubOrch?.getTagsMap().get("env")).toEqual("test"); + }); it("should test that a sub-orchestration task is completed when the sub-orchestration fails", async () => { const subOrchestrator = async (_: OrchestrationContext) => { // do nothing diff --git a/packages/durabletask-js/test/query-apis.spec.ts b/packages/durabletask-js/test/query-apis.spec.ts index 098b191..afc67f9 100644 --- a/packages/durabletask-js/test/query-apis.spec.ts +++ b/packages/durabletask-js/test/query-apis.spec.ts @@ -367,6 +367,24 @@ describe("OrchestrationState", () => { expect(state.serializedCustomStatus).toBe('{"status": "custom"}'); }); + it("should create state with tags", () => { + const tags = { env: "test", owner: "durable" }; + const state = new OrchestrationState( + "instance-2", + "TestOrchestration", + OrchestrationStatus.COMPLETED, + new Date("2024-01-01T00:00:00Z"), + new Date("2024-01-01T01:00:00Z"), + undefined, + undefined, + undefined, + undefined, + tags, + ); + + expect(state.tags).toEqual(tags); + }); + it("should correctly indicate running status", () => { const state = new OrchestrationState( "instance-3", diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index 9e59a73..a372903 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -86,6 +86,23 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); }, 31000); + it("should round-trip orchestration tags", async () => { + const orchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "done"; + }; + + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const tags = { env: "test", owner: "durable" }; + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, undefined, { tags }); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.tags).toEqual(tags); + }, 31000); + it("should be able to run an activity sequence", async () => { const plusOne = async (_: ActivityContext, input: number) => { return input + 1; diff --git a/tests/e2e/orchestration.spec.ts b/tests/e2e/orchestration.spec.ts index 56d9bcc..4559db0 100644 --- a/tests/e2e/orchestration.spec.ts +++ b/tests/e2e/orchestration.spec.ts @@ -54,6 +54,7 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); }); + it("should be able to run an activity sequence", async () => { const plusOne = async (_: ActivityContext, input: number) => { return input + 1; From 52c93c159dc048146acc9a7a8875cf3d75e968e5 Mon Sep 17 00:00:00 2001 From: torosent <17064840+torosent@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:00:33 -0800 Subject: [PATCH 2/6] Refactor tag handling: move mapToRecord function to tags.util.ts and update usages --- packages/durabletask-js/src/client/client.ts | 21 +++------------ .../durabletask-js/src/orchestration/index.ts | 16 +----------- .../src/utils/pb-helper.util.ts | 17 +++++++++--- .../durabletask-js/src/utils/tags.util.ts | 26 +++++++++++++++++++ .../worker/runtime-orchestration-context.ts | 16 +----------- .../test/orchestration_executor.spec.ts | 12 ++++++--- 6 files changed, 54 insertions(+), 54 deletions(-) create mode 100644 packages/durabletask-js/src/utils/tags.util.ts diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index d359baf..efdc5e7 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -23,25 +23,11 @@ import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page" import { FailureDetails } from "../task/failure-details"; import { Logger, ConsoleLogger } from "../types/logger.type"; import { StartOrchestrationOptions } from "../task/options"; +import { mapToRecord } from "../utils/tags.util"; // Re-export MetadataGenerator for backward compatibility export { MetadataGenerator } from "../utils/grpc-helper.util"; -function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): - | Record - | undefined { - if (!tagsMap) { - return; - } - - const tags: Record = {}; - tagsMap.forEach((value, key) => { - tags[key] = value; - }); - - return Object.keys(tags).length > 0 ? tags : undefined; -} - /** * Options for creating a TaskHubGrpcClient. */ @@ -623,14 +609,15 @@ export class TaskHubGrpcClient { * @example * ```typescript * // Iterate over all matching instances + * const logger = new ConsoleLogger(); * const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] }); * for await (const instance of pageable) { - * console.log(instance.instanceId); + * logger.info(instance.instanceId); * } * * // Iterate over pages * for await (const page of pageable.asPages()) { - * console.log(`Page has ${page.values.length} items`); + * logger.info(`Page has ${page.values.length} items`); * } * ``` * diff --git a/packages/durabletask-js/src/orchestration/index.ts b/packages/durabletask-js/src/orchestration/index.ts index 4dedb5f..571aade 100644 --- a/packages/durabletask-js/src/orchestration/index.ts +++ b/packages/durabletask-js/src/orchestration/index.ts @@ -5,21 +5,7 @@ import * as pb from "../proto/orchestrator_service_pb"; import { FailureDetails } from "../task/failure-details"; import { fromProtobuf } from "./enum/orchestration-status.enum"; import { OrchestrationState } from "./orchestration-state"; - -function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): - | Record - | undefined { - if (!tagsMap) { - return; - } - - const tags: Record = {}; - tagsMap.forEach((value, key) => { - tags[key] = value; - }); - - return Object.keys(tags).length > 0 ? tags : undefined; -} +import { mapToRecord } from "../utils/tags.util"; export function newOrchestrationState( instanceId: string, diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index 1c305b3..cc8c60f 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -243,7 +243,18 @@ export function getStringValue(val?: string): StringValue | undefined { return stringValue; } -function setTagsMap( +/** + * Populates a tag map with the provided tags. + * + * Copies all key-value pairs from the optional {@link tags} object into the given + * {@link tagsMap} by invoking its `set` method for each entry. If no tags are + * provided, this function is a no-op. + * + * @param tagsMap - A map-like object that exposes a `set(key, value)` method used + * to store tag key-value pairs. + * @param tags - An optional record of tag key-value pairs to add to the map. + */ +function populateTagsMap( tagsMap: { set: (key: string, value: string) => void }, tags?: Record, ): void { @@ -299,7 +310,7 @@ export function newScheduleTaskAction( const scheduleTaskAction = new pb.ScheduleTaskAction(); scheduleTaskAction.setName(name); scheduleTaskAction.setInput(getStringValue(encodedInput)); - setTagsMap(scheduleTaskAction.getTagsMap(), tags); + populateTagsMap(scheduleTaskAction.getTagsMap(), tags); const action = new pb.OrchestratorAction(); action.setId(id); @@ -325,7 +336,7 @@ export function newCreateSubOrchestrationAction( createSubOrchestrationAction.setName(name); createSubOrchestrationAction.setInstanceid(instanceId || ""); createSubOrchestrationAction.setInput(getStringValue(encodedInput)); - setTagsMap(createSubOrchestrationAction.getTagsMap(), tags); + populateTagsMap(createSubOrchestrationAction.getTagsMap(), tags); const action = new pb.OrchestratorAction(); action.setId(id); diff --git a/packages/durabletask-js/src/utils/tags.util.ts b/packages/durabletask-js/src/utils/tags.util.ts new file mode 100644 index 0000000..cf5d33c --- /dev/null +++ b/packages/durabletask-js/src/utils/tags.util.ts @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +type TagsMapLike = { + forEach: (callback: (value: string, key: string) => void) => void; +}; + +/** + * Converts a map-like collection of tag key/value pairs into a plain record object. + * + * @param tagsMap - A map-like object containing tag values keyed by tag name. + * @returns A record containing the tag key/value pairs, or `undefined` if the input + * is `undefined` or if no tags are present. + */ +export function mapToRecord(tagsMap?: TagsMapLike): Record | undefined { + if (!tagsMap) { + return; + } + + const tags: Record = {}; + tagsMap.forEach((value, key) => { + tags[key] = value; + }); + + return Object.keys(tags).length > 0 ? tags : undefined; +} diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index 43df358..0a1e3b9 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -14,21 +14,7 @@ import { TActivity } from "../types/activity.type"; import { TOrchestrator } from "../types/orchestrator.type"; import { Task } from "../task/task"; import { StopIterationError } from "./exception/stop-iteration-error"; - -function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }): - | Record - | undefined { - if (!tagsMap) { - return; - } - - const tags: Record = {}; - tagsMap.forEach((value, key) => { - tags[key] = value; - }); - - return Object.keys(tags).length > 0 ? tags : undefined; -} +import { mapToRecord } from "../utils/tags.util"; export class RuntimeOrchestrationContext extends OrchestrationContext { _generator?: Generator, any, any>; diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index d610af0..2dd43cb 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -165,6 +165,8 @@ describe("Orchestration Executor", () => { const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); const scheduleTask = result.actions[0]?.getScheduletask(); + expect(result.actions.length).toEqual(1); + expect(result.actions[0]?.hasScheduletask()).toBeTruthy(); expect(scheduleTask?.getTagsMap().get("env")).toEqual("test"); expect(scheduleTask?.getTagsMap().get("owner")).toEqual("durable"); }); @@ -188,7 +190,6 @@ describe("Orchestration Executor", () => { const executor = new OrchestrationExecutor(registry, testLogger); const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); - console.log(completeAction?.getFailuredetails()); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); }); @@ -212,7 +213,7 @@ describe("Orchestration Executor", () => { const executor = new OrchestrationExecutor(registry, testLogger); const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); const completeAction = getAndValidateSingleCompleteOrchestrationAction(result); - console.log(completeAction?.getFailuredetails()); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); }); @@ -242,7 +243,6 @@ describe("Orchestration Executor", () => { // TODO: In javascript this becomes an Anonymous function call (e.g., Object.) // can we do traceback in it? // Make sure the line of code where the exception was raised is included in the stack trace - // console.log(completeAction?.getFailuredetails()?.getStacktrace()?.getValue()); // const userCodeStatement = "ctx.callActivity(dummyActivity, orchestratorInput)"; // expect(completeAction?.getFailuredetails()?.getStacktrace()?.getValue()).toContain(userCodeStatement); }); @@ -898,7 +898,10 @@ describe("Orchestration Executor", () => { firstRetryIntervalInMilliseconds: 1000, backoffCoefficient: 1.0, }); - const result = yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy }); + const result = yield ctx.callActivity("flakyActivity", input, { + retry: retryPolicy, + tags: { env: "test" }, + }); return result; }; @@ -938,6 +941,7 @@ describe("Orchestration Executor", () => { expect(result.actions.length).toBe(1); expect(result.actions[0].hasScheduletask()).toBe(true); expect(result.actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + expect(result.actions[0].getScheduletask()?.getTagsMap().get("env")).toBe("test"); expect(result.actions[0].getId()).toBe(3); // New ID after timer // Step 4: Retried activity scheduled, then completes From 8f65e1b2993494e8ddc1cb8d3d402c0b705e4e9e Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal <17064840+torosent@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:17:08 -0800 Subject: [PATCH 3/6] Update tests/e2e/orchestration.spec.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/e2e/orchestration.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/e2e/orchestration.spec.ts b/tests/e2e/orchestration.spec.ts index 4559db0..56d9bcc 100644 --- a/tests/e2e/orchestration.spec.ts +++ b/tests/e2e/orchestration.spec.ts @@ -54,7 +54,6 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); }); - it("should be able to run an activity sequence", async () => { const plusOne = async (_: ActivityContext, input: number) => { return input + 1; From bb242dc3fee4529861c4537dabd2ab735f749662 Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal <17064840+torosent@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:17:16 -0800 Subject: [PATCH 4/6] Update packages/durabletask-js/src/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/durabletask-js/src/index.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index 6a604e0..a56c496 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -28,11 +28,11 @@ export { Task } from "./task/task"; // Retry policies and task options export { RetryPolicy, RetryPolicyOptions } from "./task/retry"; export { - TaskOptions, - SubOrchestrationOptions, - StartOrchestrationOptions, - taskOptionsFromRetryPolicy, - subOrchestrationOptionsFromRetryPolicy, + TaskOptions, + SubOrchestrationOptions, + StartOrchestrationOptions, + taskOptionsFromRetryPolicy, + subOrchestrationOptionsFromRetryPolicy, } from "./task/options"; // Types From a158186f170964b8a426875abf59a83d58690eb4 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:24:23 -0800 Subject: [PATCH 5/6] Fix JSDoc indentation in getAllInstances example (#90) --- packages/durabletask-js/src/client/client.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index efdc5e7..eba520c 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -609,15 +609,15 @@ export class TaskHubGrpcClient { * @example * ```typescript * // Iterate over all matching instances - * const logger = new ConsoleLogger(); + * const logger = new ConsoleLogger(); * const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] }); * for await (const instance of pageable) { - * logger.info(instance.instanceId); + * logger.info(instance.instanceId); * } * * // Iterate over pages * for await (const page of pageable.asPages()) { - * logger.info(`Page has ${page.values.length} items`); + * logger.info(`Page has ${page.values.length} items`); * } * ``` * From 64e87dd46718944490f7f4e17ad3a1352c240fb0 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Feb 2026 14:25:06 -0800 Subject: [PATCH 6/6] Refactor: Use populateTagsMap helper for tag population in client.ts (#91) --- packages/durabletask-js/src/client/client.ts | 8 ++------ packages/durabletask-js/src/utils/pb-helper.util.ts | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index eba520c..bd3c5be 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -24,6 +24,7 @@ import { FailureDetails } from "../task/failure-details"; import { Logger, ConsoleLogger } from "../types/logger.type"; import { StartOrchestrationOptions } from "../task/options"; import { mapToRecord } from "../utils/tags.util"; +import { populateTagsMap } from "../utils/pb-helper.util"; // Re-export MetadataGenerator for backward compatibility export { MetadataGenerator } from "../utils/grpc-helper.util"; @@ -188,12 +189,7 @@ export class TaskHubGrpcClient { req.setInput(i); req.setScheduledstarttimestamp(ts); - if (tags) { - const tagsMap = req.getTagsMap(); - for (const [key, value] of Object.entries(tags)) { - tagsMap.set(key, value); - } - } + populateTagsMap(req.getTagsMap(), tags); this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`); diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index cc8c60f..ddae925 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -254,7 +254,7 @@ export function getStringValue(val?: string): StringValue | undefined { * to store tag key-value pairs. * @param tags - An optional record of tag key-value pairs to add to the map. */ -function populateTagsMap( +export function populateTagsMap( tagsMap: { set: (key: string, value: string) => void }, tags?: Record, ): void {