Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from ".
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
import { FailureDetails } from "../task/failure-details";
import { Logger, ConsoleLogger } from "../types/logger.type";
import { StartOrchestrationOptions } from "../task/options";
import { mapToRecord } from "../utils/tags.util";
import { populateTagsMap } from "../utils/pb-helper.util";

// Re-export MetadataGenerator for backward compatibility
export { MetadataGenerator } from "../utils/grpc-helper.util";
Expand Down Expand Up @@ -133,13 +136,46 @@ export class TaskHubGrpcClient {
input?: TInput,
instanceId?: string,
startAt?: Date,
): Promise<string>;
/**
* Schedules a new orchestrator using the DurableTask client.
*
* @param {TOrchestrator | string} orchestrator - The orchestrator or the name of the orchestrator to be scheduled.
* @param {TInput} input - Optional input for the orchestrator.
* @param {StartOrchestrationOptions} options - Options for instance ID, start time, and tags.
* @return {Promise<string>} A Promise resolving to the unique ID of the scheduled orchestrator instance.
*/
async scheduleNewOrchestration(
orchestrator: TOrchestrator | string,
input?: TInput,
options?: StartOrchestrationOptions,
): Promise<string>;
async scheduleNewOrchestration(
orchestrator: TOrchestrator | string,
input?: TInput,
instanceIdOrOptions?: string | StartOrchestrationOptions,
startAt?: Date,
): Promise<string> {
let name;
if (typeof orchestrator === "string") {
name = orchestrator;
} else {
name = getName(orchestrator);
}

const instanceId =
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? instanceIdOrOptions
: instanceIdOrOptions.instanceId;
const scheduledStartAt =
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? startAt
: instanceIdOrOptions.startAt;
const tags =
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? undefined
: instanceIdOrOptions.tags;

const req = new pb.CreateInstanceRequest();
req.setName(name);
req.setInstanceid(instanceId ?? randomUUID());
Expand All @@ -148,11 +184,13 @@ export class TaskHubGrpcClient {
i.setValue(JSON.stringify(input));

const ts = new Timestamp();
ts.fromDate(new Date(startAt?.getTime() ?? 0));
ts.fromDate(new Date(scheduledStartAt?.getTime() ?? 0));

req.setInput(i);
req.setScheduledstarttimestamp(ts);

populateTagsMap(req.getTagsMap(), tags);

this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);

const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
Expand Down Expand Up @@ -567,14 +605,15 @@ export class TaskHubGrpcClient {
* @example
* ```typescript
* // Iterate over all matching instances
* const logger = new ConsoleLogger();
* const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] });
* for await (const instance of pageable) {
* console.log(instance.instanceId);
* logger.info(instance.instanceId);
* }
*
* // Iterate over pages
* for await (const page of pageable.asPages()) {
* console.log(`Page has ${page.values.length} items`);
* logger.info(`Page has ${page.values.length} items`);
* }
* ```
*
Expand Down Expand Up @@ -779,6 +818,8 @@ export class TaskHubGrpcClient {
);
}

const tags = mapToRecord(protoState.getTagsMap());

return new OrchestrationState(
instanceId,
name ?? "",
Expand All @@ -789,6 +830,7 @@ export class TaskHubGrpcClient {
serializedOutput,
serializedCustomStatus,
failureDetails,
tags,
);
}
}
8 changes: 7 additions & 1 deletion packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ export { Task } from "./task/task";

// Retry policies and task options
export { RetryPolicy, RetryPolicyOptions } from "./task/retry";
export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options";
export {
TaskOptions,
SubOrchestrationOptions,
StartOrchestrationOptions,
taskOptionsFromRetryPolicy,
subOrchestrationOptionsFromRetryPolicy,
} from "./task/options";

// Types
export { TOrchestrator } from "./types/orchestrator.type";
Expand Down
4 changes: 4 additions & 0 deletions packages/durabletask-js/src/orchestration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as pb from "../proto/orchestrator_service_pb";
import { FailureDetails } from "../task/failure-details";
import { fromProtobuf } from "./enum/orchestration-status.enum";
import { OrchestrationState } from "./orchestration-state";
import { mapToRecord } from "../utils/tags.util";

export function newOrchestrationState(
instanceId: string,
Expand Down Expand Up @@ -43,6 +44,8 @@ export function newOrchestrationState(
tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000);
}

const tags = mapToRecord(state?.getTagsMap());

return new OrchestrationState(
instanceId,
state?.getName() ?? "",
Expand All @@ -53,5 +56,6 @@ export function newOrchestrationState(
state?.getOutput()?.toString(),
state?.getCustomstatus()?.toString(),
failureDetails,
tags,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class OrchestrationState {
serializedOutput?: string;
serializedCustomStatus?: string;
failureDetails?: FailureDetails;
tags?: Record<string, string>;

constructor(
instanceId: string,
Expand All @@ -26,6 +27,7 @@ export class OrchestrationState {
serializedOutput?: string,
serializedCustomStatus?: string,
failureDetails?: FailureDetails,
tags?: Record<string, string>,
) {
this.instanceId = instanceId;
this.name = name;
Expand All @@ -36,6 +38,7 @@ export class OrchestrationState {
this.serializedOutput = serializedOutput;
this.serializedCustomStatus = serializedCustomStatus;
this.failureDetails = failureDetails;
this.tags = tags;
}

raiseIfFailed(): void {
Expand Down
1 change: 1 addition & 0 deletions packages/durabletask-js/src/task/options/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
export {
TaskOptions,
SubOrchestrationOptions,
StartOrchestrationOptions,
taskOptionsFromRetryPolicy,
subOrchestrationOptionsFromRetryPolicy,
} from "./task-options";
24 changes: 24 additions & 0 deletions packages/durabletask-js/src/task/options/task-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export interface TaskOptions {
* Controls how many times a task is retried and the delay between retries.
*/
retry?: RetryPolicy;
/**
* The tags to associate with the task.
*/
tags?: Record<string, string>;
}

/**
Expand All @@ -26,6 +30,26 @@ export interface SubOrchestrationOptions extends TaskOptions {
instanceId?: string;
}

/**
* Options for scheduling new orchestrations via the client.
*/
export interface StartOrchestrationOptions {
/**
* The unique ID to use for the orchestration instance.
* If not specified, a random UUID will be generated.
*/
instanceId?: string;
/**
* The time when the orchestration should start executing.
* If not specified or in the past, it will start immediately.
*/
startAt?: Date;
/**
* The tags to associate with the orchestration instance.
*/
tags?: Record<string, string>;
}

/**
* Creates a TaskOptions instance from a RetryPolicy.
*
Expand Down
34 changes: 33 additions & 1 deletion packages/durabletask-js/src/utils/pb-helper.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ export function getStringValue(val?: string): StringValue | undefined {
return stringValue;
}

/**
* Populates a tag map with the provided tags.
*
* Copies all key-value pairs from the optional {@link tags} object into the given
* {@link tagsMap} by invoking its `set` method for each entry. If no tags are
* provided, this function is a no-op.
*
* @param tagsMap - A map-like object that exposes a `set(key, value)` method used
* to store tag key-value pairs.
* @param tags - An optional record of tag key-value pairs to add to the map.
*/
export function populateTagsMap(
tagsMap: { set: (key: string, value: string) => void },
tags?: Record<string, string>,
): void {
if (!tags) {
return;
}

for (const [key, value] of Object.entries(tags)) {
tagsMap.set(key, value);
}
}

export function newCompleteOrchestrationAction(
id: number,
status: pb.OrchestrationStatus,
Expand Down Expand Up @@ -277,10 +301,16 @@ export function newCreateTimerAction(id: number, fireAt: Date): pb.OrchestratorA
return action;
}

export function newScheduleTaskAction(id: number, name: string, encodedInput?: string): pb.OrchestratorAction {
export function newScheduleTaskAction(
id: number,
name: string,
encodedInput?: string,
tags?: Record<string, string>,
): pb.OrchestratorAction {
const scheduleTaskAction = new pb.ScheduleTaskAction();
scheduleTaskAction.setName(name);
scheduleTaskAction.setInput(getStringValue(encodedInput));
populateTagsMap(scheduleTaskAction.getTagsMap(), tags);

const action = new pb.OrchestratorAction();
action.setId(id);
Expand All @@ -300,11 +330,13 @@ export function newCreateSubOrchestrationAction(
name: string,
instanceId?: string | null,
encodedInput?: string,
tags?: Record<string, string>,
): pb.OrchestratorAction {
const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction();
createSubOrchestrationAction.setName(name);
createSubOrchestrationAction.setInstanceid(instanceId || "");
createSubOrchestrationAction.setInput(getStringValue(encodedInput));
populateTagsMap(createSubOrchestrationAction.getTagsMap(), tags);

const action = new pb.OrchestratorAction();
action.setId(id);
Expand Down
26 changes: 26 additions & 0 deletions packages/durabletask-js/src/utils/tags.util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

type TagsMapLike = {
forEach: (callback: (value: string, key: string) => void) => void;
};

/**
* Converts a map-like collection of tag key/value pairs into a plain record object.
*
* @param tagsMap - A map-like object containing tag values keyed by tag name.
* @returns A record containing the tag key/value pairs, or `undefined` if the input
* is `undefined` or if no tags are present.
*/
export function mapToRecord(tagsMap?: TagsMapLike): Record<string, string> | undefined {
if (!tagsMap) {
return;
}

const tags: Record<string, string> = {};
tagsMap.forEach((value, key) => {
tags[key] = value;
});

return Object.keys(tags).length > 0 ? tags : undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { TActivity } from "../types/activity.type";
import { TOrchestrator } from "../types/orchestrator.type";
import { Task } from "../task/task";
import { StopIterationError } from "./exception/stop-iteration-error";
import { mapToRecord } from "../utils/tags.util";

export class RuntimeOrchestrationContext extends OrchestrationContext {
_generator?: Generator<Task<any>, any, any>;
Expand Down Expand Up @@ -262,7 +263,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
const id = this.nextSequenceNumber();
const name = typeof activity === "string" ? activity : getName(activity);
const encodedInput = input ? JSON.stringify(input) : undefined;
const action = ph.newScheduleTaskAction(id, name, encodedInput);
const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags);
this._pendingActions[action.getId()] = action;

// If a retry policy is provided, create a RetryableTask
Expand Down Expand Up @@ -306,7 +307,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
}

const encodedInput = input ? JSON.stringify(input) : undefined;
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput);
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags);
this._pendingActions[action.getId()] = action;

// If a retry policy is provided, create a RetryableTask
Expand Down Expand Up @@ -520,7 +521,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
}
const name = scheduleTask.getName();
const input = scheduleTask.getInput()?.getValue();
newAction = ph.newScheduleTaskAction(newId, name, input);
const tags = mapToRecord(scheduleTask.getTagsMap());
newAction = ph.newScheduleTaskAction(newId, name, input, tags);
} else {
// Reschedule a sub-orchestration task
const subOrch = originalAction.getCreatesuborchestration();
Expand All @@ -530,7 +532,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
const name = subOrch.getName();
const instanceId = subOrch.getInstanceid();
const input = subOrch.getInput()?.getValue();
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input);
const tags = mapToRecord(subOrch.getTagsMap());
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags);
}

// Register the new action
Expand Down
Loading
Loading