diff --git a/package-lock.json b/package-lock.json index f223c5a..a42012e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -226,6 +226,7 @@ "integrity": "sha512-H3mcG6ZDLTlYfaSNi0iOKkigqMFvkTKlGUYlD8GW7nNOYRrevuA46iTypPyv+06V3fEmvvazfntkBU34L0azAw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.28.6", "@babel/generator": "^7.28.6", @@ -897,6 +898,7 @@ "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.14.3.tgz", "integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==", "license": "Apache-2.0", + "peer": true, "dependencies": { "@grpc/proto-loader": "^0.8.0", "@js-sdsl/ordered-map": "^4.4.2" @@ -1598,6 +1600,7 @@ "dev": true, "hasInstallScript": true, "license": "Apache-2.0", + "peer": true, "dependencies": { "@swc/counter": "^0.1.3", "@swc/types": "^0.1.25" @@ -1813,6 +1816,7 @@ "integrity": "sha512-TXTnIcNJQEKwThMMqBXsZ4VGAza6bvN4pa41Rkqoio6QBKMvo+5lexeTMScGCIxtzgQJzElcvIltani+adC5PQ==", "dev": true, "license": "Apache-2.0", + "peer": true, "dependencies": { "tslib": "^2.8.0" } @@ -1974,6 +1978,7 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.130.tgz", "integrity": "sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==", "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~5.26.4" } @@ -2047,6 +2052,7 @@ "integrity": "sha512-BtE0k6cjwjLZoZixN0t5AKP0kSzlGu7FctRXYuPAm//aaiZhmfq1JwdYpYr1brzEspYyFeF+8XF5j2VK6oalrA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.54.0", "@typescript-eslint/types": "8.54.0", @@ -2288,6 +2294,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2612,6 +2619,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -3259,6 +3267,7 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -4174,6 +4183,7 @@ "integrity": "sha512-NIy3oAFp9shda19hy4HK0HRTWKtPJmGdnvywu01nOqNC2vZg+Z+fvJDxpMQA88eb2I9EcafcdjYgsDthnYTvGw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@jest/core": "^29.7.0", "@jest/types": "^29.6.3", @@ -5920,6 +5930,7 @@ "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -6644,6 +6655,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -6795,6 +6807,7 @@ "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -6881,6 +6894,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/packages/durabletask-js/src/client/client.ts b/packages/durabletask-js/src/client/client.ts index d7265cd..5e48f11 100644 --- a/packages/durabletask-js/src/client/client.ts +++ b/packages/durabletask-js/src/client/client.ts @@ -305,6 +305,64 @@ export class TaskHubGrpcClient { ); } + /** + * Rewinds a failed orchestration instance to a previous state to allow it to retry from the point of failure. + * + * This method is used to "rewind" a failed orchestration back to its last known good state, allowing it + * to be replayed from that point. This is particularly useful for recovering from transient failures + * or for debugging purposes. + * + * Only orchestration instances in the `Failed` state can be rewound. + * + * @param instanceId - The unique identifier of the orchestration instance to rewind. + * @param reason - A reason string describing why the orchestration is being rewound. + * @throws {Error} If the orchestration instance is not found. + * @throws {Error} If the orchestration instance is in a state that does not allow rewinding. + * @throws {Error} If the rewind operation is not supported by the backend. + */ + async rewindInstance(instanceId: string, reason: string): Promise { + if (!instanceId) { + throw new Error("instanceId is required"); + } + + const req = new pb.RewindInstanceRequest(); + req.setInstanceid(instanceId); + + if (reason) { + const reasonValue = new StringValue(); + reasonValue.setValue(reason); + req.setReason(reasonValue); + } + + console.log(`Rewinding '${instanceId}' with reason: ${reason}`); + + try { + await callWithMetadata( + this._stub.rewindInstance.bind(this._stub), + req, + this._metadataGenerator, + ); + } catch (e) { + // Handle gRPC errors and convert them to appropriate errors + if (e && typeof e === "object" && "code" in e) { + const grpcError = e as { code: number; details?: string }; + if (grpcError.code === grpc.status.NOT_FOUND) { + throw new Error(`An orchestration with the instanceId '${instanceId}' was not found.`); + } + if (grpcError.code === grpc.status.FAILED_PRECONDITION) { + throw new Error(grpcError.details || `Cannot rewind orchestration '${instanceId}': it is in a state that does not allow rewinding.`); + } + if (grpcError.code === grpc.status.UNIMPLEMENTED) { + throw new Error(grpcError.details || `The rewind operation is not supported by the backend.`); + } + if (grpcError.code === grpc.status.CANCELLED) { + throw new Error(`The rewind operation for '${instanceId}' was cancelled.`); + } + } + throw e; + } + } + /** * Restarts an existing orchestration instance with its original input. * diff --git a/test/e2e-azuremanaged/rewind.spec.ts b/test/e2e-azuremanaged/rewind.spec.ts new file mode 100644 index 0000000..91f2ac8 --- /dev/null +++ b/test/e2e-azuremanaged/rewind.spec.ts @@ -0,0 +1,236 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * E2E tests for rewindInstance API. + * + * NOTE: These tests can run against either: + * 1. DTS emulator - set ENDPOINT and TASKHUB environment variables + * 2. Real Azure DTS - set AZURE_DTS_CONNECTION_STRING environment variable + * + * Example for emulator: + * docker run -i -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest + * ENDPOINT=localhost:8080 TASKHUB=default npx jest rewind.spec.ts + * + * Example for real DTS: + * AZURE_DTS_CONNECTION_STRING="Endpoint=https://...;Authentication=DefaultAzure;TaskHub=th3" npx jest rewind.spec.ts + */ + +import { + TaskHubGrpcClient, + TaskHubGrpcWorker, + OrchestrationStatus, + OrchestrationContext, + TOrchestrator, +} from "@microsoft/durabletask-js"; +import { + DurableTaskAzureManagedClientBuilder, + DurableTaskAzureManagedWorkerBuilder, +} from "@microsoft/durabletask-js-azuremanaged"; + +// Read environment variables - support both connection string and endpoint/taskhub +const connectionString = process.env.AZURE_DTS_CONNECTION_STRING; +const endpoint = process.env.ENDPOINT || "localhost:8080"; +const taskHub = process.env.TASKHUB || "default"; + +/** + * Generates a unique instance ID with the given prefix. + * @param prefix - The prefix for the instance ID + * @returns A unique instance ID in the format: {prefix}-{timestamp}-{random} + */ +function generateUniqueInstanceId(prefix: string): string { + return `${prefix}-${Date.now()}-${Math.random().toString(36).substring(7)}`; +} + +function createClient(): TaskHubGrpcClient { + const builder = new DurableTaskAzureManagedClientBuilder(); + if (connectionString) { + return builder.connectionString(connectionString).build(); + } + return builder.endpoint(endpoint, taskHub, null).build(); +} + +function createWorker(): TaskHubGrpcWorker { + const builder = new DurableTaskAzureManagedWorkerBuilder(); + if (connectionString) { + return builder.connectionString(connectionString).build(); + } + return builder.endpoint(endpoint, taskHub, null).build(); +} + +describe("Rewind Instance E2E Tests", () => { + let taskHubClient: TaskHubGrpcClient; + let taskHubWorker: TaskHubGrpcWorker; + let workerStarted = false; + + // Helper to start worker and track state + const startWorker = async () => { + await taskHubWorker.start(); + workerStarted = true; + }; + + beforeEach(async () => { + workerStarted = false; + taskHubClient = createClient(); + taskHubWorker = createWorker(); + }); + + afterEach(async () => { + if (workerStarted) { + await taskHubWorker.stop(); + } + await taskHubClient.stop(); + }); + + describe("rewindInstance - positive cases", () => { + // Track execution attempt count per instance ID for isolation across orchestrations + const attemptCountByInstance = new Map(); + + // An orchestrator that fails on first attempt, succeeds on subsequent attempts + const failOnceOrchestrator: TOrchestrator = async (ctx: OrchestrationContext, _input: number) => { + const instanceId = ctx.instanceId; + const currentAttempt = (attemptCountByInstance.get(instanceId) ?? 0) + 1; + attemptCountByInstance.set(instanceId, currentAttempt); + + if (currentAttempt === 1) { + throw new Error("First attempt failed!"); + } + return `Success on attempt ${currentAttempt}`; + }; + + beforeEach(() => { + attemptCountByInstance.clear(); + }); + + // Skip these tests if the backend doesn't support rewind (emulator returns UNIMPLEMENTED) + it.skip("should rewind a failed orchestration instance (requires backend support)", async () => { + const instanceId = generateUniqueInstanceId("rewind-test"); + + taskHubWorker.addOrchestrator(failOnceOrchestrator); + await startWorker(); + + // Schedule the orchestration - it will fail on first attempt + await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId); + + // Wait for the orchestration to fail + const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30); + expect(failedState).toBeDefined(); + expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED); + expect(failedState?.failureDetails?.message).toContain("First attempt failed!"); + + // Now rewind the failed orchestration + await taskHubClient.rewindInstance(instanceId, "Testing rewind functionality"); + + // The orchestration should now be running again + // Wait for it to complete (successfully this time) + const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30); + expect(rewindedState).toBeDefined(); + expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED); + expect(rewindedState?.serializedOutput).toContain("Success on attempt"); + }); + + it.skip("should rewind a failed orchestration with a descriptive reason (requires backend support)", async () => { + const instanceId = generateUniqueInstanceId("rewind-reason"); + const rewindReason = "Rewinding due to transient network failure"; + + taskHubWorker.addOrchestrator(failOnceOrchestrator); + await startWorker(); + + // Schedule and wait for failure + await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId); + const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30); + expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED); + + // Rewind with a specific reason + await taskHubClient.rewindInstance(instanceId, rewindReason); + + // Verify it can complete after rewind + const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30); + expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED); + }); + }); + + describe("rewindInstance - negative cases", () => { + // A simple orchestrator that completes successfully + const simpleOrchestrator: TOrchestrator = async (ctx: OrchestrationContext, input: number) => { + return input * 2; + }; + + // An orchestrator that waits for an event (stays in Running state) + const waitingOrchestrator: TOrchestrator = async (ctx: OrchestrationContext) => { + const approval = await ctx.waitForExternalEvent("approval"); + return `Approved: ${approval}`; + }; + + it("should throw an error when rewinding a non-existent instance (or if rewind is not supported)", async () => { + const nonExistentId = generateUniqueInstanceId("non-existent"); + + // No need to start worker for this test + // Will throw either "not found" or "not supported" depending on backend + await expect(taskHubClient.rewindInstance(nonExistentId, "Test rewind")).rejects.toThrow(); + }); + + it("should throw an error when rewinding a completed orchestration (or if rewind is not supported)", async () => { + const instanceId = generateUniqueInstanceId("rewind-completed"); + + taskHubWorker.addOrchestrator(simpleOrchestrator); + await startWorker(); + + // Schedule and wait for completion + await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, 5, instanceId); + const state = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30); + expect(state?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED); + + // Try to rewind a completed orchestration - should fail + await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow(); + }); + + it.skip("should throw an error when rewinding a running orchestration (requires backend support)", async () => { + const instanceId = generateUniqueInstanceId("rewind-running"); + + taskHubWorker.addOrchestrator(waitingOrchestrator); + await startWorker(); + + // Schedule the orchestration (will be waiting for event) + await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId); + + // Wait for it to start running + await taskHubClient.waitForOrchestrationStart(instanceId, false, 30); + + // Try to rewind a running orchestration - should fail + try { + await taskHubClient.rewindInstance(instanceId, "Test rewind"); + // If we get here, the operation didn't throw - which might be expected on some backends + } catch (e) { + expect((e as Error).message).toMatch(/not allow|precondition|running/i); + } finally { + // Clean up: terminate the waiting orchestration + await taskHubClient.terminateOrchestration(instanceId, "Test cleanup"); + await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30); + } + }); + + it.skip("should throw an error when rewinding a terminated orchestration (requires backend support)", async () => { + const instanceId = generateUniqueInstanceId("rewind-terminated"); + + taskHubWorker.addOrchestrator(waitingOrchestrator); + await startWorker(); + + // Schedule the orchestration + await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId); + await taskHubClient.waitForOrchestrationStart(instanceId, false, 30); + + // Terminate it + await taskHubClient.terminateOrchestration(instanceId, "Terminating for test"); + const terminatedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30); + expect(terminatedState?.runtimeStatus).toBe(OrchestrationStatus.TERMINATED); + + // Try to rewind a terminated orchestration - should fail + await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow(); + }); + + it("should throw an error when instanceId is empty", async () => { + await expect(taskHubClient.rewindInstance("", "Test rewind")).rejects.toThrow("instanceId is required"); + }); + }); +});