diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index 473a52a..8695119 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -13,11 +13,14 @@ import { randomUUID } from "crypto"; import { newOrchestrationState } from "../orchestration"; import { OrchestrationState } from "../orchestration/orchestration-state"; import { GrpcClient } from "./client-grpc"; -import { OrchestrationStatus, toProtobuf } from "../orchestration/enum/orchestration-status.enum"; +import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/enum/orchestration-status.enum"; import { TimeoutError } from "../exception/timeout-error"; import { PurgeResult } from "../orchestration/orchestration-purge-result"; import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria"; import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util"; +import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query"; +import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page"; +import { FailureDetails } from "../task/failure-details"; // Re-export MetadataGenerator for backward compatibility export { MetadataGenerator } from "../utils/grpc-helper.util"; @@ -384,4 +387,241 @@ export class TaskHubGrpcClient { } return new PurgeResult(res.getDeletedinstancecount()); } + + /** + * Queries orchestration instances and returns an async iterable of results. + * + * This method supports querying orchestration instances by various filter criteria including + * creation time range, runtime status, instance ID prefix, and task hub names. + * + * The results are returned as an AsyncPageable that supports both iteration over individual + * items and iteration over pages. + * + * @example + * ```typescript + * // Iterate over all matching instances + * const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] }); + * for await (const instance of pageable) { + * console.log(instance.instanceId); + * } + * + * // Iterate over pages + * for await (const page of pageable.asPages()) { + * console.log(`Page has ${page.values.length} items`); + * } + * ``` + * + * @param filter - Optional filter criteria for the query. + * @returns An AsyncPageable of OrchestrationState objects. + */ + getAllInstances(filter?: OrchestrationQuery): AsyncPageable { + return createAsyncPageable( + async (continuationToken?: string, pageSize?: number): Promise> => { + const req = new pb.QueryInstancesRequest(); + const query = new pb.InstanceQuery(); + + // Set created time range + if (filter?.createdFrom) { + const timestamp = new Timestamp(); + timestamp.fromDate(filter.createdFrom); + query.setCreatedtimefrom(timestamp); + } + + if (filter?.createdTo) { + const timestamp = new Timestamp(); + timestamp.fromDate(filter.createdTo); + query.setCreatedtimeto(timestamp); + } + + // Set runtime statuses + if (filter?.statuses) { + for (const status of filter.statuses) { + query.addRuntimestatus(toProtobuf(status)); + } + } + + // Set task hub names + if (filter?.taskHubNames) { + for (const name of filter.taskHubNames) { + const stringValue = new StringValue(); + stringValue.setValue(name); + query.addTaskhubnames(stringValue); + } + } + + // Set instance ID prefix + if (filter?.instanceIdPrefix) { + const prefixValue = new StringValue(); + prefixValue.setValue(filter.instanceIdPrefix); + query.setInstanceidprefix(prefixValue); + } + + // Set page size + const effectivePageSize = pageSize ?? filter?.pageSize ?? DEFAULT_PAGE_SIZE; + query.setMaxinstancecount(effectivePageSize); + + // Set continuation token (use provided or from filter) + const token = continuationToken ?? filter?.continuationToken; + if (token) { + const tokenValue = new StringValue(); + tokenValue.setValue(token); + query.setContinuationtoken(tokenValue); + } + + // Set fetch inputs and outputs + query.setFetchinputsandoutputs(filter?.fetchInputsAndOutputs ?? false); + + req.setQuery(query); + + const response = await callWithMetadata( + this._stub.queryInstances.bind(this._stub), + req, + this._metadataGenerator, + ); + + const states: OrchestrationState[] = []; + const orchestrationStateList = response.getOrchestrationstateList(); + for (const state of orchestrationStateList) { + const orchestrationState = this._createOrchestrationStateFromProto(state, filter?.fetchInputsAndOutputs ?? false); + if (orchestrationState) { + states.push(orchestrationState); + } + } + + const responseContinuationToken = response.getContinuationtoken()?.getValue(); + return new Page(states, responseContinuationToken); + }, + ); + } + + /** + * Lists orchestration instance IDs that match the specified runtime status + * and completed time range, using key-based pagination. + * + * This method is optimized for listing instance IDs without fetching full instance metadata, + * making it more efficient when only instance IDs are needed. + * + * @example + * ```typescript + * // Get first page of completed instances + * const page = await client.listInstanceIds({ + * runtimeStatus: [OrchestrationStatus.COMPLETED], + * pageSize: 50 + * }); + * + * // Get next page using the continuation key + * if (page.hasMoreResults) { + * const nextPage = await client.listInstanceIds({ + * runtimeStatus: [OrchestrationStatus.COMPLETED], + * pageSize: 50, + * lastInstanceKey: page.continuationToken + * }); + * } + * ``` + * + * @param options - Optional filter criteria and pagination options. + * @returns A Promise that resolves to a Page of instance IDs. + */ + async listInstanceIds(options?: ListInstanceIdsOptions): Promise> { + const req = new pb.ListInstanceIdsRequest(); + + // Set page size + const pageSize = options?.pageSize ?? DEFAULT_PAGE_SIZE; + req.setPagesize(pageSize); + + // Set last instance key (continuation token) + if (options?.lastInstanceKey) { + const keyValue = new StringValue(); + keyValue.setValue(options.lastInstanceKey); + req.setLastinstancekey(keyValue); + } + + // Set runtime statuses + if (options?.runtimeStatus) { + for (const status of options.runtimeStatus) { + req.addRuntimestatus(toProtobuf(status)); + } + } + + // Set completed time range + if (options?.completedTimeFrom) { + const timestamp = new Timestamp(); + timestamp.fromDate(options.completedTimeFrom); + req.setCompletedtimefrom(timestamp); + } + + if (options?.completedTimeTo) { + const timestamp = new Timestamp(); + timestamp.fromDate(options.completedTimeTo); + req.setCompletedtimeto(timestamp); + } + + const response = await callWithMetadata( + this._stub.listInstanceIds.bind(this._stub), + req, + this._metadataGenerator, + ); + + const instanceIds = response.getInstanceidsList(); + const lastInstanceKey = response.getLastinstancekey()?.getValue(); + + return new Page(instanceIds, lastInstanceKey); + } + + /** + * Helper method to create an OrchestrationState from a protobuf OrchestrationState. + */ + private _createOrchestrationStateFromProto( + protoState: pb.OrchestrationState, + fetchPayloads: boolean, + ): OrchestrationState | undefined { + const instanceId = protoState.getInstanceid(); + const name = protoState.getName(); + const runtimeStatus = protoState.getOrchestrationstatus(); + const createdTimestamp = protoState.getCreatedtimestamp(); + const lastUpdatedTimestamp = protoState.getLastupdatedtimestamp(); + + if (!instanceId) { + return undefined; + } + + const createdAt = createdTimestamp ? createdTimestamp.toDate() : new Date(0); + const lastUpdatedAt = lastUpdatedTimestamp ? lastUpdatedTimestamp.toDate() : new Date(0); + + // Map proto status to our status enum using the existing conversion function + const status = fromProtobuf(runtimeStatus); + + let serializedInput: string | undefined; + let serializedOutput: string | undefined; + let serializedCustomStatus: string | undefined; + + if (fetchPayloads) { + serializedInput = protoState.getInput()?.getValue(); + serializedOutput = protoState.getOutput()?.getValue(); + serializedCustomStatus = protoState.getCustomstatus()?.getValue(); + } + + // Extract failure details if present + let failureDetails; + const protoFailureDetails = protoState.getFailuredetails(); + if (protoFailureDetails) { + failureDetails = new FailureDetails( + protoFailureDetails.getErrormessage(), + protoFailureDetails.getErrortype(), + protoFailureDetails.getStacktrace()?.getValue(), + ); + } + + return new OrchestrationState( + instanceId, + name ?? "", + status, + createdAt, + lastUpdatedAt, + serializedInput, + serializedOutput, + serializedCustomStatus, + failureDetails, + ); + } } diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index eb1b345..026c4e9 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -12,6 +12,11 @@ export { ActivityContext } from "./task/context/activity-context"; // Orchestration types and utilities export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria"; export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum"; +export { OrchestrationState } from "./orchestration/orchestration-state"; + +// Query types +export { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "./orchestration/orchestration-query"; +export { Page, AsyncPageable, createAsyncPageable } from "./orchestration/page"; // Proto types (for advanced usage) export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb"; diff --git a/packages/durabletask-js/src/orchestration/orchestration-query.ts b/packages/durabletask-js/src/orchestration/orchestration-query.ts new file mode 100644 index 0000000..afa5f4d --- /dev/null +++ b/packages/durabletask-js/src/orchestration/orchestration-query.ts @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { OrchestrationStatus } from "./enum/orchestration-status.enum"; + +/** + * Default page size when not supplied. + */ +export const DEFAULT_PAGE_SIZE = 100; + +/** + * A filter for querying orchestration instances. + */ +export interface OrchestrationQuery { + /** + * Filter by orchestrations created on or after this date. + */ + createdFrom?: Date; + + /** + * Filter by orchestrations created on or before this date. + */ + createdTo?: Date; + + /** + * Filter by orchestration runtime statuses. + */ + statuses?: OrchestrationStatus[]; + + /** + * Names of task hubs to query across. + */ + taskHubNames?: string[]; + + /** + * Prefix of instance IDs to include. + */ + instanceIdPrefix?: string; + + /** + * Maximum number of items to include per page. + * @default 100 + */ + pageSize?: number; + + /** + * Whether to include instance inputs and outputs in the query results. + * @default false + */ + fetchInputsAndOutputs?: boolean; + + /** + * The continuation token to continue a paged query. + */ + continuationToken?: string; +} + +/** + * Options for listing instance IDs with key-based pagination. + */ +export interface ListInstanceIdsOptions { + /** + * Optional set of runtime statuses to filter by. If undefined, all statuses are included. + */ + runtimeStatus?: OrchestrationStatus[]; + + /** + * Inclusive lower bound of the orchestration completed time filter. + */ + completedTimeFrom?: Date; + + /** + * Inclusive upper bound of the orchestration completed time filter. + */ + completedTimeTo?: Date; + + /** + * Maximum number of instance IDs to return in a single page. + * @default 100 + */ + pageSize?: number; + + /** + * Continuation key from the previous page. If undefined, listing starts from the beginning. + */ + lastInstanceKey?: string; +} diff --git a/packages/durabletask-js/src/orchestration/page.ts b/packages/durabletask-js/src/orchestration/page.ts new file mode 100644 index 0000000..dcc1232 --- /dev/null +++ b/packages/durabletask-js/src/orchestration/page.ts @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Represents a single page of results from a paginated query. + * @template T The type of values held by the page. + */ +export class Page { + /** + * The values contained in this page. + */ + readonly values: readonly T[]; + + /** + * The continuation token for fetching the next page, or undefined if there are no more items. + */ + readonly continuationToken?: string; + + /** + * Creates a new Page instance. + * @param values The values this page holds. + * @param continuationToken The continuation token for the next page, or undefined if no more pages. + */ + constructor(values: readonly T[], continuationToken?: string) { + this.values = values; + this.continuationToken = continuationToken; + } + + /** + * Returns true if there are more pages available. + */ + get hasMoreResults(): boolean { + return this.continuationToken !== undefined && this.continuationToken !== ""; + } +} + +/** + * Represents an async iterator over pages of results. + * @template T The type of values held by each page. + */ +export interface AsyncPageable extends AsyncIterable { + /** + * Returns an async iterator over pages of results. + * @param continuationToken Optional continuation token to start from. + * @param pageSize Optional page size hint. + */ + asPages(continuationToken?: string, pageSize?: number): AsyncIterable>; +} + +/** + * Creates an AsyncPageable from a page fetching function. + * @template T The type of values held by each page. + * @param pageFunc A function that fetches pages given a continuation token and page size. + * @returns An AsyncPageable that can be iterated over. + */ +export function createAsyncPageable( + pageFunc: (continuationToken?: string, pageSize?: number) => Promise>, +): AsyncPageable { + return { + async *[Symbol.asyncIterator](): AsyncIterableIterator { + let continuationToken: string | undefined = undefined; + do { + const page = await pageFunc(continuationToken, undefined); + for (const item of page.values) { + yield item; + } + continuationToken = page.continuationToken; + } while (continuationToken !== undefined && continuationToken !== ""); + }, + + asPages( + startContinuationToken?: string, + pageSizeHint?: number, + ): AsyncIterable> { + return { + async *[Symbol.asyncIterator](): AsyncIterableIterator> { + let continuationToken: string | undefined = startContinuationToken; + do { + const page = await pageFunc(continuationToken, pageSizeHint); + yield page; + continuationToken = page.continuationToken; + } while (continuationToken !== undefined && continuationToken !== ""); + }, + }; + }, + }; +} diff --git a/packages/durabletask-js/test/query-apis.spec.ts b/packages/durabletask-js/test/query-apis.spec.ts new file mode 100644 index 0000000..098b191 --- /dev/null +++ b/packages/durabletask-js/test/query-apis.spec.ts @@ -0,0 +1,402 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { Page, createAsyncPageable } from "../src/orchestration/page"; +import { FailureDetails } from "../src/task/failure-details"; +import { + OrchestrationQuery, + ListInstanceIdsOptions, + DEFAULT_PAGE_SIZE, +} from "../src/orchestration/orchestration-query"; +import { OrchestrationStatus } from "../src/orchestration/enum/orchestration-status.enum"; +import { OrchestrationState } from "../src/orchestration/orchestration-state"; + +describe("Page", () => { + describe("constructor", () => { + it("should create a page with values and no continuation token", () => { + const values = ["item1", "item2", "item3"]; + const page = new Page(values); + + expect(page.values).toEqual(values); + expect(page.continuationToken).toBeUndefined(); + expect(page.hasMoreResults).toBe(false); + }); + + it("should create a page with values and a continuation token", () => { + const values = ["item1", "item2"]; + const continuationToken = "token123"; + const page = new Page(values, continuationToken); + + expect(page.values).toEqual(values); + expect(page.continuationToken).toBe(continuationToken); + expect(page.hasMoreResults).toBe(true); + }); + + it("should handle empty values array", () => { + const page = new Page([]); + + expect(page.values).toEqual([]); + expect(page.hasMoreResults).toBe(false); + }); + + it("should treat empty string continuation token as no more results", () => { + const page = new Page(["item1"], ""); + + expect(page.hasMoreResults).toBe(false); + }); + }); + + describe("hasMoreResults", () => { + it("should return true when continuation token is present and non-empty", () => { + const page = new Page(["item1"], "nextPage"); + expect(page.hasMoreResults).toBe(true); + }); + + it("should return false when continuation token is undefined", () => { + const page = new Page(["item1"]); + expect(page.hasMoreResults).toBe(false); + }); + + it("should return false when continuation token is empty string", () => { + const page = new Page(["item1"], ""); + expect(page.hasMoreResults).toBe(false); + }); + }); +}); + +describe("createAsyncPageable", () => { + it("should iterate over all items across multiple pages", async () => { + let callCount = 0; + const pageFunc = async (continuationToken?: string): Promise> => { + callCount++; + if (continuationToken === undefined) { + return new Page(["item1", "item2"], "page2"); + } else if (continuationToken === "page2") { + return new Page(["item3", "item4"], "page3"); + } else { + return new Page(["item5"], undefined); + } + }; + + const pageable = createAsyncPageable(pageFunc); + const items: string[] = []; + + for await (const item of pageable) { + items.push(item); + } + + expect(items).toEqual(["item1", "item2", "item3", "item4", "item5"]); + expect(callCount).toBe(3); + }); + + it("should iterate over pages using asPages()", async () => { + let callCount = 0; + const pageFunc = async (): Promise> => { + callCount++; + if (callCount === 1) { + return new Page([1, 2, 3], "page2"); + } else { + return new Page([4, 5], undefined); + } + }; + + const pageable = createAsyncPageable(pageFunc); + const pages: Page[] = []; + + for await (const page of pageable.asPages()) { + pages.push(page); + } + + expect(pages.length).toBe(2); + expect(pages[0].values).toEqual([1, 2, 3]); + expect(pages[0].continuationToken).toBe("page2"); + expect(pages[1].values).toEqual([4, 5]); + expect(pages[1].continuationToken).toBeUndefined(); + }); + + it("should handle single page with no continuation token", async () => { + const pageFunc = async (): Promise> => { + return new Page(["only-item"], undefined); + }; + + const pageable = createAsyncPageable(pageFunc); + const items: string[] = []; + + for await (const item of pageable) { + items.push(item); + } + + expect(items).toEqual(["only-item"]); + }); + + it("should handle empty first page", async () => { + const pageFunc = async (): Promise> => { + return new Page([], undefined); + }; + + const pageable = createAsyncPageable(pageFunc); + const items: string[] = []; + + for await (const item of pageable) { + items.push(item); + } + + expect(items).toEqual([]); + }); + + it("should pass continuation token to page function", async () => { + const receivedTokens: (string | undefined)[] = []; + const pageFunc = async (continuationToken?: string): Promise> => { + receivedTokens.push(continuationToken); + if (continuationToken === undefined) { + return new Page(["a"], "token1"); + } else if (continuationToken === "token1") { + return new Page(["b"], "token2"); + } else { + return new Page(["c"], undefined); + } + }; + + const pageable = createAsyncPageable(pageFunc); + + for await (const _ of pageable) { + // consume iterator + } + + expect(receivedTokens).toEqual([undefined, "token1", "token2"]); + }); + + it("should allow starting from a specific continuation token using asPages", async () => { + const receivedTokens: (string | undefined)[] = []; + const pageFunc = async (continuationToken?: string): Promise> => { + receivedTokens.push(continuationToken); + if (continuationToken === "startHere") { + return new Page(["fromMiddle"], "nextToken"); + } else { + return new Page(["end"], undefined); + } + }; + + const pageable = createAsyncPageable(pageFunc); + const pages: Page[] = []; + + for await (const page of pageable.asPages("startHere")) { + pages.push(page); + } + + expect(receivedTokens).toEqual(["startHere", "nextToken"]); + expect(pages[0].values).toEqual(["fromMiddle"]); + expect(pages[1].values).toEqual(["end"]); + }); + + it("should pass page size hint to page function", async () => { + let receivedPageSize: number | undefined; + const pageFunc = async (_: string | undefined, pageSize?: number): Promise> => { + receivedPageSize = pageSize; + return new Page(["item"], undefined); + }; + + const pageable = createAsyncPageable(pageFunc); + + for await (const _ of pageable.asPages(undefined, 50)) { + // consume iterator + } + + expect(receivedPageSize).toBe(50); + }); +}); + +describe("OrchestrationQuery", () => { + it("should have correct default page size", () => { + expect(DEFAULT_PAGE_SIZE).toBe(100); + }); + + it("should allow creating query with all optional fields undefined", () => { + const query: OrchestrationQuery = {}; + + expect(query.createdFrom).toBeUndefined(); + expect(query.createdTo).toBeUndefined(); + expect(query.statuses).toBeUndefined(); + expect(query.taskHubNames).toBeUndefined(); + expect(query.instanceIdPrefix).toBeUndefined(); + expect(query.pageSize).toBeUndefined(); + expect(query.fetchInputsAndOutputs).toBeUndefined(); + expect(query.continuationToken).toBeUndefined(); + }); + + it("should allow creating query with all fields populated", () => { + const createdFrom = new Date("2024-01-01"); + const createdTo = new Date("2024-12-31"); + const query: OrchestrationQuery = { + createdFrom, + createdTo, + statuses: [OrchestrationStatus.COMPLETED, OrchestrationStatus.RUNNING], + taskHubNames: ["hub1", "hub2"], + instanceIdPrefix: "prefix-", + pageSize: 50, + fetchInputsAndOutputs: true, + continuationToken: "token123", + }; + + expect(query.createdFrom).toBe(createdFrom); + expect(query.createdTo).toBe(createdTo); + expect(query.statuses).toEqual([OrchestrationStatus.COMPLETED, OrchestrationStatus.RUNNING]); + expect(query.taskHubNames).toEqual(["hub1", "hub2"]); + expect(query.instanceIdPrefix).toBe("prefix-"); + expect(query.pageSize).toBe(50); + expect(query.fetchInputsAndOutputs).toBe(true); + expect(query.continuationToken).toBe("token123"); + }); + + it("should support filtering by single status", () => { + const query: OrchestrationQuery = { + statuses: [OrchestrationStatus.FAILED], + }; + + expect(query.statuses?.length).toBe(1); + expect(query.statuses?.[0]).toBe(OrchestrationStatus.FAILED); + }); + + it("should support all orchestration statuses", () => { + const allStatuses = [ + OrchestrationStatus.RUNNING, + OrchestrationStatus.COMPLETED, + OrchestrationStatus.FAILED, + OrchestrationStatus.TERMINATED, + OrchestrationStatus.CONTINUED_AS_NEW, + OrchestrationStatus.PENDING, + OrchestrationStatus.SUSPENDED, + ]; + + const query: OrchestrationQuery = { + statuses: allStatuses, + }; + + expect(query.statuses).toEqual(allStatuses); + }); +}); + +describe("ListInstanceIdsOptions", () => { + it("should allow creating options with all fields undefined", () => { + const options: ListInstanceIdsOptions = {}; + + expect(options.runtimeStatus).toBeUndefined(); + expect(options.completedTimeFrom).toBeUndefined(); + expect(options.completedTimeTo).toBeUndefined(); + expect(options.pageSize).toBeUndefined(); + expect(options.lastInstanceKey).toBeUndefined(); + }); + + it("should allow creating options with all fields populated", () => { + const completedTimeFrom = new Date("2024-01-01"); + const completedTimeTo = new Date("2024-12-31"); + const options: ListInstanceIdsOptions = { + runtimeStatus: [OrchestrationStatus.COMPLETED, OrchestrationStatus.FAILED], + completedTimeFrom, + completedTimeTo, + pageSize: 25, + lastInstanceKey: "key123", + }; + + expect(options.runtimeStatus).toEqual([OrchestrationStatus.COMPLETED, OrchestrationStatus.FAILED]); + expect(options.completedTimeFrom).toBe(completedTimeFrom); + expect(options.completedTimeTo).toBe(completedTimeTo); + expect(options.pageSize).toBe(25); + expect(options.lastInstanceKey).toBe("key123"); + }); + + it("should support filtering by terminal statuses", () => { + const terminalStatuses = [ + OrchestrationStatus.COMPLETED, + OrchestrationStatus.FAILED, + OrchestrationStatus.TERMINATED, + ]; + + const options: ListInstanceIdsOptions = { + runtimeStatus: terminalStatuses, + }; + + expect(options.runtimeStatus).toEqual(terminalStatuses); + }); + + it("should allow pagination with just lastInstanceKey", () => { + const options: ListInstanceIdsOptions = { + lastInstanceKey: "someKey", + }; + + expect(options.lastInstanceKey).toBe("someKey"); + expect(options.pageSize).toBeUndefined(); + }); +}); + +describe("OrchestrationState", () => { + it("should create state with all required fields", () => { + const state = new OrchestrationState( + "instance-1", + "TestOrchestration", + OrchestrationStatus.COMPLETED, + new Date("2024-01-01T00:00:00Z"), + new Date("2024-01-01T01:00:00Z"), + ); + + expect(state.instanceId).toBe("instance-1"); + expect(state.name).toBe("TestOrchestration"); + expect(state.runtimeStatus).toBe(OrchestrationStatus.COMPLETED); + expect(state.createdAt).toEqual(new Date("2024-01-01T00:00:00Z")); + expect(state.lastUpdatedAt).toEqual(new Date("2024-01-01T01:00:00Z")); + expect(state.serializedInput).toBeUndefined(); + expect(state.serializedOutput).toBeUndefined(); + expect(state.serializedCustomStatus).toBeUndefined(); + expect(state.failureDetails).toBeUndefined(); + }); + + it("should create state with optional fields", () => { + const state = new OrchestrationState( + "instance-2", + "TestOrchestration", + OrchestrationStatus.COMPLETED, + new Date("2024-01-01T00:00:00Z"), + new Date("2024-01-01T01:00:00Z"), + '{"input": "test"}', + '{"output": "result"}', + '{"status": "custom"}', + ); + + expect(state.serializedInput).toBe('{"input": "test"}'); + expect(state.serializedOutput).toBe('{"output": "result"}'); + expect(state.serializedCustomStatus).toBe('{"status": "custom"}'); + }); + + it("should correctly indicate running status", () => { + const state = new OrchestrationState( + "instance-3", + "TestOrchestration", + OrchestrationStatus.RUNNING, + new Date(), + new Date(), + ); + + expect(state.runtimeStatus).toBe(OrchestrationStatus.RUNNING); + }); + + it("should correctly indicate failed status with failure details", () => { + const failureDetails = new FailureDetails("Error", "Something went wrong", "stack trace"); + + const state = new OrchestrationState( + "instance-4", + "TestOrchestration", + OrchestrationStatus.FAILED, + new Date(), + new Date(), + undefined, + undefined, + undefined, + failureDetails, + ); + + expect(state.runtimeStatus).toBe(OrchestrationStatus.FAILED); + expect(state.failureDetails).toBeDefined(); + expect(state.failureDetails?.message).toBe("Error"); + expect(state.failureDetails?.errorType).toBe("Something went wrong"); + }); +}); diff --git a/test/e2e-azuremanaged/query-apis.spec.ts b/test/e2e-azuremanaged/query-apis.spec.ts new file mode 100644 index 0000000..0630329 --- /dev/null +++ b/test/e2e-azuremanaged/query-apis.spec.ts @@ -0,0 +1,527 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * E2E tests for Query APIs (getAllInstances, listInstanceIds). + * + * NOTE: These tests can run against either: + * 1. DTS emulator - set ENDPOINT and TASKHUB environment variables + * 2. Real Azure DTS - set AZURE_DTS_CONNECTION_STRING environment variable + * + * Example for emulator: + * docker run -i -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest + * ENDPOINT=localhost:8080 TASKHUB=default npx jest query-apis.spec.ts + * + * Example for real DTS: + * AZURE_DTS_CONNECTION_STRING="Endpoint=https://...;Authentication=DefaultAzure;TaskHub=th3" npx jest query-apis.spec.ts + */ + +import { + TaskHubGrpcClient, + TaskHubGrpcWorker, + OrchestrationStatus, + OrchestrationContext, + TOrchestrator, + OrchestrationQuery, + Page, + OrchestrationState, +} from "@microsoft/durabletask-js"; +import { + DurableTaskAzureManagedClientBuilder, + DurableTaskAzureManagedWorkerBuilder, +} from "@microsoft/durabletask-js-azuremanaged"; + +// Read environment variables - support both connection string and endpoint/taskhub +const connectionString = process.env.AZURE_DTS_CONNECTION_STRING; +const endpoint = process.env.ENDPOINT || "localhost:8080"; +const taskHub = process.env.TASKHUB || "default"; + +function createClient(): TaskHubGrpcClient { + const builder = new DurableTaskAzureManagedClientBuilder(); + if (connectionString) { + return builder.connectionString(connectionString).build(); + } + return builder.endpoint(endpoint, taskHub, null).build(); +} + +function createWorker(): TaskHubGrpcWorker { + const builder = new DurableTaskAzureManagedWorkerBuilder(); + if (connectionString) { + return builder.connectionString(connectionString).build(); + } + return builder.endpoint(endpoint, taskHub, null).build(); +} + +describe("Query APIs E2E Tests", () => { + let taskHubClient: TaskHubGrpcClient; + let taskHubWorker: TaskHubGrpcWorker; + + beforeEach(async () => { + taskHubClient = createClient(); + taskHubWorker = createWorker(); + }); + + afterEach(async () => { + await taskHubWorker.stop(); + await taskHubClient.stop(); + }); + + describe("getAllInstances", () => { + it("should query all orchestration instances without filters", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule multiple orchestrations + const ids: string[] = []; + for (let i = 0; i < 3; i++) { + const id = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + ids.push(id); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + } + + // Query all instances + const pageable = taskHubClient.getAllInstances(); + const foundInstances: OrchestrationState[] = []; + + for await (const instance of pageable) { + if (ids.includes(instance.instanceId)) { + foundInstances.push(instance); + } + } + + // Verify we found all the instances we created + expect(foundInstances.length).toBe(3); + for (const id of ids) { + expect(foundInstances.some((i) => i.instanceId === id)).toBe(true); + } + }, 60000); + + it("should query instances filtered by status", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule and complete an orchestration + const id = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // Query only completed instances + const query: OrchestrationQuery = { + statuses: [OrchestrationStatus.COMPLETED], + }; + + const pageable = taskHubClient.getAllInstances(query); + let foundInstance: OrchestrationState | undefined; + + for await (const instance of pageable) { + if (instance.instanceId === id) { + foundInstance = instance; + break; + } + } + + expect(foundInstance).toBeDefined(); + expect(foundInstance?.instanceId).toBe(id); + expect(foundInstance?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED); + }, 60000); + + it("should query instances with instanceIdPrefix filter", async () => { + const prefix = `test-prefix-${Date.now()}-`; + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule orchestrations with specific prefix + const prefixedIds: string[] = []; + for (let i = 0; i < 2; i++) { + const instanceId = `${prefix}${i}`; + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, undefined, instanceId); + prefixedIds.push(instanceId); + await taskHubClient.waitForOrchestrationCompletion(instanceId, undefined, 30); + } + + // Also schedule one without the prefix + const nonPrefixedId = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(nonPrefixedId, undefined, 30); + + // Query only instances with the prefix + const query: OrchestrationQuery = { + instanceIdPrefix: prefix, + }; + + const pageable = taskHubClient.getAllInstances(query); + const foundInstances: OrchestrationState[] = []; + + for await (const instance of pageable) { + foundInstances.push(instance); + } + + // Should find exactly the prefixed instances + expect(foundInstances.length).toBe(2); + for (const id of prefixedIds) { + expect(foundInstances.some((i) => i.instanceId === id)).toBe(true); + } + expect(foundInstances.some((i) => i.instanceId === nonPrefixedId)).toBe(false); + }, 60000); + + it("should respect pageSize when iterating", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule multiple orchestrations + const ids: string[] = []; + const prefix = `page-test-${Date.now()}-`; + for (let i = 0; i < 5; i++) { + const instanceId = `${prefix}${i}`; + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, undefined, instanceId); + ids.push(instanceId); + await taskHubClient.waitForOrchestrationCompletion(instanceId, undefined, 30); + } + + // Query with small page size + const query: OrchestrationQuery = { + instanceIdPrefix: prefix, + pageSize: 2, + }; + + const pageable = taskHubClient.getAllInstances(query); + const pages: Page[] = []; + + for await (const page of pageable.asPages()) { + pages.push(page); + } + + // Should have multiple pages + expect(pages.length).toBeGreaterThan(1); + + // Each page should have at most 2 items (the page size) + for (const page of pages) { + expect(page.values.length).toBeLessThanOrEqual(2); + } + + // Total items should be 5 + const totalItems = pages.reduce((sum, page) => sum + page.values.length, 0); + expect(totalItems).toBe(5); + }, 120000); + + it("should fetch inputs and outputs when requested", async () => { + const echoOrchestrator: TOrchestrator = async (_: OrchestrationContext, input: any) => { + return { received: input }; + }; + + taskHubWorker.addOrchestrator(echoOrchestrator); + await taskHubWorker.start(); + + const input = { message: "hello" }; + const id = await taskHubClient.scheduleNewOrchestration(echoOrchestrator, input); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // Query with fetchInputsAndOutputs = true + const query: OrchestrationQuery = { + fetchInputsAndOutputs: true, + }; + + const pageable = taskHubClient.getAllInstances(query); + let foundInstance: OrchestrationState | undefined; + + for await (const instance of pageable) { + if (instance.instanceId === id) { + foundInstance = instance; + break; + } + } + + expect(foundInstance).toBeDefined(); + // Note: Input should always be available when fetchInputsAndOutputs is true + // Output may or may not be available depending on orchestration state + expect(foundInstance?.serializedInput).toBeDefined(); + }, 60000); + + it("should filter by createdFrom and createdTo", async () => { + const prefix = `time-filter-${Date.now()}-`; + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + const beforeTime = new Date(Date.now() - 60000); // 1 minute ago + + const instanceId = `${prefix}test`; + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, undefined, instanceId); + await taskHubClient.waitForOrchestrationCompletion(instanceId, undefined, 30); + + const afterTime = new Date(Date.now() + 60000); // 1 minute from now + + // Query with time range that includes the orchestration + const query: OrchestrationQuery = { + createdFrom: beforeTime, + createdTo: afterTime, + instanceIdPrefix: prefix, + }; + + const pageable = taskHubClient.getAllInstances(query); + let foundInstance: OrchestrationState | undefined; + + for await (const instance of pageable) { + if (instance.instanceId === instanceId) { + foundInstance = instance; + break; + } + } + + expect(foundInstance).toBeDefined(); + expect(foundInstance?.instanceId).toBe(instanceId); + }, 60000); + }); + + describe("listInstanceIds", () => { + it("should list all instance IDs", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule multiple orchestrations with unique prefix + const prefix = `list-ids-${Date.now()}-`; + const ids: string[] = []; + for (let i = 0; i < 3; i++) { + const instanceId = `${prefix}${i}`; + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, undefined, instanceId); + ids.push(instanceId); + await taskHubClient.waitForOrchestrationCompletion(instanceId, undefined, 30); + } + + // List instance IDs and paginate through all results + const allInstanceIds: string[] = []; + let lastInstanceKey: string | undefined = undefined; + + do { + const page = await taskHubClient.listInstanceIds({ + runtimeStatus: [OrchestrationStatus.COMPLETED], + lastInstanceKey, + }); + allInstanceIds.push(...page.values); + lastInstanceKey = page.continuationToken; + } while (lastInstanceKey && lastInstanceKey !== ""); + + // Verify we can find the created instance IDs + for (const id of ids) { + expect(allInstanceIds.includes(id)).toBe(true); + } + }, 60000); + + it("should filter by runtime status", async () => { + const prefix = `filter-status-${Date.now()}-`; + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule and complete an orchestration with unique ID + const instanceId = `${prefix}test`; + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, undefined, instanceId); + await taskHubClient.waitForOrchestrationCompletion(instanceId, undefined, 30); + + // List completed instances and paginate through all results + const allInstanceIds: string[] = []; + let lastInstanceKey: string | undefined = undefined; + + do { + const page = await taskHubClient.listInstanceIds({ + runtimeStatus: [OrchestrationStatus.COMPLETED], + lastInstanceKey, + }); + allInstanceIds.push(...page.values); + lastInstanceKey = page.continuationToken; + } while (lastInstanceKey && lastInstanceKey !== ""); + + expect(allInstanceIds.includes(instanceId)).toBe(true); + }, 60000); + + it("should support pagination with lastInstanceKey", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule multiple orchestrations + for (let i = 0; i < 5; i++) { + const id = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + } + + // Get first page with small page size + const firstPage = await taskHubClient.listInstanceIds({ + pageSize: 2, + }); + + expect(firstPage.values.length).toBeLessThanOrEqual(2); + + // If there are more results, get the next page + if (firstPage.hasMoreResults && firstPage.continuationToken) { + const secondPage = await taskHubClient.listInstanceIds({ + pageSize: 2, + lastInstanceKey: firstPage.continuationToken, + }); + + expect(secondPage.values.length).toBeGreaterThan(0); + + // First and second page should have different instance IDs + const firstPageIds = new Set(firstPage.values); + for (const id of secondPage.values) { + expect(firstPageIds.has(id)).toBe(false); + } + } + }, 120000); + + it("should respect pageSize parameter", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + // Schedule multiple orchestrations + for (let i = 0; i < 5; i++) { + const id = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + } + + // List with specific page size + const page = await taskHubClient.listInstanceIds({ + pageSize: 3, + }); + + expect(page.values.length).toBeLessThanOrEqual(3); + }, 60000); + + it("should filter by completed time range", async () => { + const simpleOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "completed"; + }; + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await taskHubWorker.start(); + + const beforeTime = new Date(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const id = await taskHubClient.scheduleNewOrchestration(simpleOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + await new Promise((resolve) => setTimeout(resolve, 100)); + const afterTime = new Date(); + + // List with completed time filter + const page = await taskHubClient.listInstanceIds({ + runtimeStatus: [OrchestrationStatus.COMPLETED], + completedTimeFrom: beforeTime, + completedTimeTo: afterTime, + }); + + // The instance should be in the results + expect(page.values.includes(id)).toBe(true); + }, 60000); + }); + + describe("Query API Integration", () => { + it("should correctly differentiate between running and completed instances", async () => { + const longRunningOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Wait for an event that won't come (simulates long-running) + yield ctx.waitForExternalEvent("never-sent"); + return "completed"; + }; + + const quickOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "quick"; + }; + + taskHubWorker.addOrchestrator(longRunningOrchestrator); + taskHubWorker.addOrchestrator(quickOrchestrator); + await taskHubWorker.start(); + + // Schedule a long-running orchestration (will stay in RUNNING state) + const runningId = await taskHubClient.scheduleNewOrchestration(longRunningOrchestrator); + await taskHubClient.waitForOrchestrationStart(runningId, false, 10); + + // Schedule a quick orchestration (will complete) + const completedId = await taskHubClient.scheduleNewOrchestration(quickOrchestrator); + await taskHubClient.waitForOrchestrationCompletion(completedId, undefined, 30); + + // Query for running instances + const runningQuery: OrchestrationQuery = { + statuses: [OrchestrationStatus.RUNNING], + }; + const runningPageable = taskHubClient.getAllInstances(runningQuery); + const runningInstances: string[] = []; + for await (const instance of runningPageable) { + runningInstances.push(instance.instanceId); + } + + // Query for completed instances + const completedQuery: OrchestrationQuery = { + statuses: [OrchestrationStatus.COMPLETED], + }; + const completedPageable = taskHubClient.getAllInstances(completedQuery); + const completedInstances: string[] = []; + for await (const instance of completedPageable) { + completedInstances.push(instance.instanceId); + } + + expect(runningInstances).toContain(runningId); + expect(completedInstances).toContain(completedId); + expect(runningInstances).not.toContain(completedId); + expect(completedInstances).not.toContain(runningId); + + // Terminate the long-running orchestration for cleanup + await taskHubClient.terminateOrchestration(runningId); + }, 60000); + + it("should handle empty query results gracefully", async () => { + // Start the worker (required by afterEach) + const dummyOrchestrator: TOrchestrator = async (_: OrchestrationContext) => { + return "dummy"; + }; + taskHubWorker.addOrchestrator(dummyOrchestrator); + await taskHubWorker.start(); + + // Query with a very specific prefix that shouldn't match anything + const nonExistentPrefix = `non-existent-${Date.now()}-${Math.random()}`; + const query: OrchestrationQuery = { + instanceIdPrefix: nonExistentPrefix, + }; + + const pageable = taskHubClient.getAllInstances(query); + const instances: OrchestrationState[] = []; + + for await (const instance of pageable) { + instances.push(instance); + } + + expect(instances.length).toBe(0); + }, 30000); + }); +});