Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 241 additions & 1 deletion packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import { randomUUID } from "crypto";
import { newOrchestrationState } from "../orchestration";
import { OrchestrationState } from "../orchestration/orchestration-state";
import { GrpcClient } from "./client-grpc";
import { OrchestrationStatus, toProtobuf } from "../orchestration/enum/orchestration-status.enum";
import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/enum/orchestration-status.enum";
import { TimeoutError } from "../exception/timeout-error";
import { PurgeResult } from "../orchestration/orchestration-purge-result";
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
import { FailureDetails } from "../task/failure-details";

// Re-export MetadataGenerator for backward compatibility
export { MetadataGenerator } from "../utils/grpc-helper.util";
Expand Down Expand Up @@ -384,4 +387,241 @@ export class TaskHubGrpcClient {
}
return new PurgeResult(res.getDeletedinstancecount());
}

/**
* Queries orchestration instances and returns an async iterable of results.
*
* This method supports querying orchestration instances by various filter criteria including
* creation time range, runtime status, instance ID prefix, and task hub names.
*
* The results are returned as an AsyncPageable that supports both iteration over individual
* items and iteration over pages.
*
* @example
* ```typescript
* // Iterate over all matching instances
* const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] });
* for await (const instance of pageable) {
* console.log(instance.instanceId);
* }
*
* // Iterate over pages
* for await (const page of pageable.asPages()) {
* console.log(`Page has ${page.values.length} items`);
* }
* ```
*
* @param filter - Optional filter criteria for the query.
* @returns An AsyncPageable of OrchestrationState objects.
*/
getAllInstances(filter?: OrchestrationQuery): AsyncPageable<OrchestrationState> {
return createAsyncPageable<OrchestrationState>(
async (continuationToken?: string, pageSize?: number): Promise<Page<OrchestrationState>> => {
const req = new pb.QueryInstancesRequest();
const query = new pb.InstanceQuery();

// Set created time range
if (filter?.createdFrom) {
const timestamp = new Timestamp();
timestamp.fromDate(filter.createdFrom);
query.setCreatedtimefrom(timestamp);
}

if (filter?.createdTo) {
const timestamp = new Timestamp();
timestamp.fromDate(filter.createdTo);
query.setCreatedtimeto(timestamp);
}

// Set runtime statuses
if (filter?.statuses) {
for (const status of filter.statuses) {
query.addRuntimestatus(toProtobuf(status));
}
}

// Set task hub names
if (filter?.taskHubNames) {
for (const name of filter.taskHubNames) {
const stringValue = new StringValue();
stringValue.setValue(name);
query.addTaskhubnames(stringValue);
}
}

// Set instance ID prefix
if (filter?.instanceIdPrefix) {
const prefixValue = new StringValue();
prefixValue.setValue(filter.instanceIdPrefix);
query.setInstanceidprefix(prefixValue);
}

// Set page size
const effectivePageSize = pageSize ?? filter?.pageSize ?? DEFAULT_PAGE_SIZE;
query.setMaxinstancecount(effectivePageSize);

// Set continuation token (use provided or from filter)
const token = continuationToken ?? filter?.continuationToken;
if (token) {
const tokenValue = new StringValue();
tokenValue.setValue(token);
query.setContinuationtoken(tokenValue);
}

// Set fetch inputs and outputs
query.setFetchinputsandoutputs(filter?.fetchInputsAndOutputs ?? false);

req.setQuery(query);

const response = await callWithMetadata<pb.QueryInstancesRequest, pb.QueryInstancesResponse>(
this._stub.queryInstances.bind(this._stub),
req,
this._metadataGenerator,
);

const states: OrchestrationState[] = [];
const orchestrationStateList = response.getOrchestrationstateList();
for (const state of orchestrationStateList) {
const orchestrationState = this._createOrchestrationStateFromProto(state, filter?.fetchInputsAndOutputs ?? false);
if (orchestrationState) {
states.push(orchestrationState);
}
}

const responseContinuationToken = response.getContinuationtoken()?.getValue();
return new Page(states, responseContinuationToken);
},
);
}

/**
* Lists orchestration instance IDs that match the specified runtime status
* and completed time range, using key-based pagination.
*
* This method is optimized for listing instance IDs without fetching full instance metadata,
* making it more efficient when only instance IDs are needed.
*
* @example
* ```typescript
* // Get first page of completed instances
* const page = await client.listInstanceIds({
* runtimeStatus: [OrchestrationStatus.COMPLETED],
* pageSize: 50
* });
*
* // Get next page using the continuation key
* if (page.hasMoreResults) {
* const nextPage = await client.listInstanceIds({
* runtimeStatus: [OrchestrationStatus.COMPLETED],
* pageSize: 50,
* lastInstanceKey: page.continuationToken
* });
* }
* ```
*
* @param options - Optional filter criteria and pagination options.
* @returns A Promise that resolves to a Page of instance IDs.
*/
async listInstanceIds(options?: ListInstanceIdsOptions): Promise<Page<string>> {
const req = new pb.ListInstanceIdsRequest();

// Set page size
const pageSize = options?.pageSize ?? DEFAULT_PAGE_SIZE;
req.setPagesize(pageSize);

// Set last instance key (continuation token)
if (options?.lastInstanceKey) {
const keyValue = new StringValue();
keyValue.setValue(options.lastInstanceKey);
req.setLastinstancekey(keyValue);
}

// Set runtime statuses
if (options?.runtimeStatus) {
for (const status of options.runtimeStatus) {
req.addRuntimestatus(toProtobuf(status));
}
}

// Set completed time range
if (options?.completedTimeFrom) {
const timestamp = new Timestamp();
timestamp.fromDate(options.completedTimeFrom);
req.setCompletedtimefrom(timestamp);
}

if (options?.completedTimeTo) {
const timestamp = new Timestamp();
timestamp.fromDate(options.completedTimeTo);
req.setCompletedtimeto(timestamp);
}

const response = await callWithMetadata<pb.ListInstanceIdsRequest, pb.ListInstanceIdsResponse>(
this._stub.listInstanceIds.bind(this._stub),
req,
this._metadataGenerator,
);

const instanceIds = response.getInstanceidsList();
const lastInstanceKey = response.getLastinstancekey()?.getValue();

return new Page(instanceIds, lastInstanceKey);
}

/**
* Helper method to create an OrchestrationState from a protobuf OrchestrationState.
*/
private _createOrchestrationStateFromProto(
protoState: pb.OrchestrationState,
fetchPayloads: boolean,
): OrchestrationState | undefined {
const instanceId = protoState.getInstanceid();
const name = protoState.getName();
const runtimeStatus = protoState.getOrchestrationstatus();
const createdTimestamp = protoState.getCreatedtimestamp();
const lastUpdatedTimestamp = protoState.getLastupdatedtimestamp();

if (!instanceId) {
return undefined;
}

const createdAt = createdTimestamp ? createdTimestamp.toDate() : new Date(0);
const lastUpdatedAt = lastUpdatedTimestamp ? lastUpdatedTimestamp.toDate() : new Date(0);

// Map proto status to our status enum using the existing conversion function
const status = fromProtobuf(runtimeStatus);

let serializedInput: string | undefined;
let serializedOutput: string | undefined;
let serializedCustomStatus: string | undefined;

if (fetchPayloads) {
serializedInput = protoState.getInput()?.getValue();
serializedOutput = protoState.getOutput()?.getValue();
serializedCustomStatus = protoState.getCustomstatus()?.getValue();
}

// Extract failure details if present
let failureDetails;
const protoFailureDetails = protoState.getFailuredetails();
if (protoFailureDetails) {
failureDetails = new FailureDetails(
protoFailureDetails.getErrormessage(),
protoFailureDetails.getErrortype(),
protoFailureDetails.getStacktrace()?.getValue(),
);
}

return new OrchestrationState(
instanceId,
name ?? "",
status,
createdAt,
lastUpdatedAt,
serializedInput,
serializedOutput,
serializedCustomStatus,
failureDetails,
);
}
}
5 changes: 5 additions & 0 deletions packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ export { ActivityContext } from "./task/context/activity-context";
// Orchestration types and utilities
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
export { OrchestrationState } from "./orchestration/orchestration-state";

// Query types
export { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "./orchestration/orchestration-query";
export { Page, AsyncPageable, createAsyncPageable } from "./orchestration/page";

// Proto types (for advanced usage)
export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb";
Expand Down
87 changes: 87 additions & 0 deletions packages/durabletask-js/src/orchestration/orchestration-query.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { OrchestrationStatus } from "./enum/orchestration-status.enum";

/**
* Default page size when not supplied.
*/
export const DEFAULT_PAGE_SIZE = 100;

/**
* A filter for querying orchestration instances.
*/
export interface OrchestrationQuery {
/**
* Filter by orchestrations created on or after this date.
*/
createdFrom?: Date;

/**
* Filter by orchestrations created on or before this date.
*/
createdTo?: Date;

/**
* Filter by orchestration runtime statuses.
*/
statuses?: OrchestrationStatus[];

/**
* Names of task hubs to query across.
*/
taskHubNames?: string[];

/**
* Prefix of instance IDs to include.
*/
instanceIdPrefix?: string;

/**
* Maximum number of items to include per page.
* @default 100
*/
pageSize?: number;

/**
* Whether to include instance inputs and outputs in the query results.
* @default false
*/
fetchInputsAndOutputs?: boolean;

/**
* The continuation token to continue a paged query.
*/
continuationToken?: string;
}

/**
* Options for listing instance IDs with key-based pagination.
*/
export interface ListInstanceIdsOptions {
/**
* Optional set of runtime statuses to filter by. If undefined, all statuses are included.
*/
runtimeStatus?: OrchestrationStatus[];

/**
* Inclusive lower bound of the orchestration completed time filter.
*/
completedTimeFrom?: Date;

/**
* Inclusive upper bound of the orchestration completed time filter.
*/
completedTimeTo?: Date;

/**
* Maximum number of instance IDs to return in a single page.
* @default 100
*/
pageSize?: number;

/**
* Continuation key from the previous page. If undefined, listing starts from the beginning.
*/
lastInstanceKey?: string;
}
Loading
Loading