Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,64 @@ export class TaskHubGrpcClient {
);
}

/**
* Rewinds a failed orchestration instance to a previous state to allow it to retry from the point of failure.
*
* This method is used to "rewind" a failed orchestration back to its last known good state, allowing it
* to be replayed from that point. This is particularly useful for recovering from transient failures
* or for debugging purposes.
*
* Only orchestration instances in the `Failed` state can be rewound.
Comment on lines +319 to +325
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSDoc describes rewinding to a “previous state” / “last known good state” and replaying “from that point”, which may not match actual backend semantics (many “rewind” implementations restart execution and replay from the beginning/history). Since this is a public API doc, please align the description with the actual server behavior (what state changes, whether execution restarts from the beginning, whether history is preserved, etc.) to avoid misleading users.

Suggested change
* Rewinds a failed orchestration instance to a previous state to allow it to retry from the point of failure.
*
* This method is used to "rewind" a failed orchestration back to its last known good state, allowing it
* to be replayed from that point. This is particularly useful for recovering from transient failures
* or for debugging purposes.
*
* Only orchestration instances in the `Failed` state can be rewound.
* Restarts a failed orchestration instance so that it can be re-executed.
*
* This method marks a failed orchestration instance as replayable and asks the backend
* to start a new execution pass using the existing event history for that instance.
* The orchestration logic will run again from the beginning, reconstructing its state
* from the recorded history. Existing history is not deleted or rolled back, and any
* external side effects produced by the previous run are not undone.
*
* Only orchestration instances in the `Failed` state can be rewound. Backends that do
* not support this operation may reject the request.

Copilot uses AI. Check for mistakes.
*
* @param instanceId - The unique identifier of the orchestration instance to rewind.
* @param reason - A reason string describing why the orchestration is being rewound.
* @throws {Error} If the orchestration instance is not found.
* @throws {Error} If the orchestration instance is in a state that does not allow rewinding.
* @throws {Error} If the rewind operation is not supported by the backend.
*/
async rewindInstance(instanceId: string, reason: string): Promise<void> {
Comment on lines +328 to +333
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason is typed as required (reason: string) but the implementation treats it as optional (if (reason) { ... }). This is confusing for API consumers. Consider changing the signature to reason?: string (and updating the JSDoc accordingly), or enforce a required non-empty reason (and validate it similarly to instanceId).

Suggested change
* @param reason - A reason string describing why the orchestration is being rewound.
* @throws {Error} If the orchestration instance is not found.
* @throws {Error} If the orchestration instance is in a state that does not allow rewinding.
* @throws {Error} If the rewind operation is not supported by the backend.
*/
async rewindInstance(instanceId: string, reason: string): Promise<void> {
* @param reason - An optional reason string describing why the orchestration is being rewound.
* @throws {Error} If the orchestration instance is not found.
* @throws {Error} If the orchestration instance is in a state that does not allow rewinding.
* @throws {Error} If the rewind operation is not supported by the backend.
*/
async rewindInstance(instanceId: string, reason?: string): Promise<void> {

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new error-mapping logic (NOT_FOUND / FAILED_PRECONDITION / UNIMPLEMENTED / CANCELLED) is core behavior of this API but isn’t reliably covered by the added E2E tests (positive tests are skipped, and negative tests mostly assert generic toThrow). Add unit tests that mock the gRPC stub to throw errors with these status codes and assert the resulting messages/behavior.

Copilot uses AI. Check for mistakes.
if (!instanceId) {
throw new Error("instanceId is required");
}

const req = new pb.RewindInstanceRequest();
req.setInstanceid(instanceId);

if (reason) {
const reasonValue = new StringValue();
reasonValue.setValue(reason);
req.setReason(reasonValue);
}
Comment on lines +341 to +345
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason is typed as required (reason: string) but the implementation treats it as optional (if (reason) { ... }). This is confusing for API consumers. Consider changing the signature to reason?: string (and updating the JSDoc accordingly), or enforce a required non-empty reason (and validate it similarly to instanceId).

Copilot uses AI. Check for mistakes.

console.log(`Rewinding '${instanceId}' with reason: ${reason}`);
Comment on lines +333 to +347
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Library code shouldn’t console.log() (it’s noisy, hard to suppress in production, and can leak identifiers/reasons into logs). Consider removing this line or using the project’s logging abstraction (e.g., injected logger / debug-level logger) so callers can control logging.

Copilot uses AI. Check for mistakes.

try {
await callWithMetadata<pb.RewindInstanceRequest, pb.RewindInstanceResponse>(
this._stub.rewindInstance.bind(this._stub),
req,
this._metadataGenerator,
);
} catch (e) {
// Handle gRPC errors and convert them to appropriate errors
if (e && typeof e === "object" && "code" in e) {
const grpcError = e as { code: number; details?: string };
if (grpcError.code === grpc.status.NOT_FOUND) {
throw new Error(`An orchestration with the instanceId '${instanceId}' was not found.`);
}
if (grpcError.code === grpc.status.FAILED_PRECONDITION) {
throw new Error(grpcError.details || `Cannot rewind orchestration '${instanceId}': it is in a state that does not allow rewinding.`);
}
if (grpcError.code === grpc.status.UNIMPLEMENTED) {
throw new Error(grpcError.details || `The rewind operation is not supported by the backend.`);
}
if (grpcError.code === grpc.status.CANCELLED) {
throw new Error(`The rewind operation for '${instanceId}' was cancelled.`);
}
}
throw e;
}
}
Comment on lines +349 to +374
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new error-mapping logic (NOT_FOUND / FAILED_PRECONDITION / UNIMPLEMENTED / CANCELLED) is core behavior of this API but isn’t reliably covered by the added E2E tests (positive tests are skipped, and negative tests mostly assert generic toThrow). Add unit tests that mock the gRPC stub to throw errors with these status codes and assert the resulting messages/behavior.

Copilot uses AI. Check for mistakes.

/**
* Purges orchestration instance metadata from the durable store.
*
Expand Down
226 changes: 226 additions & 0 deletions test/e2e-azuremanaged/rewind.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* E2E tests for rewindInstance API.
*
* NOTE: These tests can run against either:
* 1. DTS emulator - set ENDPOINT and TASKHUB environment variables
* 2. Real Azure DTS - set AZURE_DTS_CONNECTION_STRING environment variable
*
* Example for emulator:
* docker run -i -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest
* ENDPOINT=localhost:8080 TASKHUB=default npx jest rewind.spec.ts
*
* Example for real DTS:
* AZURE_DTS_CONNECTION_STRING="Endpoint=https://...;Authentication=DefaultAzure;TaskHub=th3" npx jest rewind.spec.ts
*/

import {
TaskHubGrpcClient,
TaskHubGrpcWorker,
OrchestrationStatus,
OrchestrationContext,
TOrchestrator,
} from "@microsoft/durabletask-js";
import {
DurableTaskAzureManagedClientBuilder,
DurableTaskAzureManagedWorkerBuilder,
} from "@microsoft/durabletask-js-azuremanaged";

// Read environment variables - support both connection string and endpoint/taskhub
const connectionString = process.env.AZURE_DTS_CONNECTION_STRING;
const endpoint = process.env.ENDPOINT || "localhost:8080";
const taskHub = process.env.TASKHUB || "default";

function createClient(): TaskHubGrpcClient {
const builder = new DurableTaskAzureManagedClientBuilder();
if (connectionString) {
return builder.connectionString(connectionString).build();
}
return builder.endpoint(endpoint, taskHub, null).build();
}

function createWorker(): TaskHubGrpcWorker {
const builder = new DurableTaskAzureManagedWorkerBuilder();
if (connectionString) {
return builder.connectionString(connectionString).build();
}
return builder.endpoint(endpoint, taskHub, null).build();
}

describe("Rewind Instance E2E Tests", () => {
let taskHubClient: TaskHubGrpcClient;
let taskHubWorker: TaskHubGrpcWorker;
let workerStarted = false;

// Helper to start worker and track state
const startWorker = async () => {
await taskHubWorker.start();
workerStarted = true;
};

beforeEach(async () => {
workerStarted = false;
taskHubClient = createClient();
taskHubWorker = createWorker();
});

afterEach(async () => {
if (workerStarted) {
await taskHubWorker.stop();
}
await taskHubClient.stop();
});

describe("rewindInstance - positive cases", () => {
// Track execution attempt count for retry logic
let attemptCount = 0;

// An orchestrator that fails on first attempt, succeeds on subsequent attempts
const failOnceOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext, _input: number) => {
// Use input as a key to track attempts per instance
// After rewind, the orchestrator replays from the beginning
attemptCount++;
if (attemptCount === 1) {
throw new Error("First attempt failed!");
}
return `Success on attempt ${attemptCount}`;
};

beforeEach(() => {
attemptCount = 0;
Comment on lines +77 to +92
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says “Use input as a key to track attempts per instance”, but the implementation uses a single attemptCount shared across all runs in the describe block and does not use _input. Either adjust the comment to describe the current behavior, or actually key attempt tracking by instance/input (e.g., a Map) to match the comment and avoid confusion later.

Suggested change
// Track execution attempt count for retry logic
let attemptCount = 0;
// An orchestrator that fails on first attempt, succeeds on subsequent attempts
const failOnceOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext, _input: number) => {
// Use input as a key to track attempts per instance
// After rewind, the orchestrator replays from the beginning
attemptCount++;
if (attemptCount === 1) {
throw new Error("First attempt failed!");
}
return `Success on attempt ${attemptCount}`;
};
beforeEach(() => {
attemptCount = 0;
// Track execution attempt count for retry logic, keyed by input/instance
const attemptCountByInput = new Map<number, number>();
// An orchestrator that fails on first attempt per input, succeeds on subsequent attempts
const failOnceOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext, _input: number) => {
// Use input as a key to track attempts per instance
// After rewind, the orchestrator replays from the beginning
const previousAttemptCount = attemptCountByInput.get(_input) ?? 0;
const currentAttemptCount = previousAttemptCount + 1;
attemptCountByInput.set(_input, currentAttemptCount);
if (currentAttemptCount === 1) {
throw new Error("First attempt failed!");
}
return `Success on attempt ${currentAttemptCount}`;
};
beforeEach(() => {
attemptCountByInput.clear();

Copilot uses AI. Check for mistakes.
});

// Skip these tests if the backend doesn't support rewind (emulator returns UNIMPLEMENTED)
it.skip("should rewind a failed orchestration instance (requires backend support)", async () => {
const instanceId = `rewind-test-${Date.now()}`;

taskHubWorker.addOrchestrator(failOnceOrchestrator);
await startWorker();

// Schedule the orchestration - it will fail on first attempt
await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId);

// Wait for the orchestration to fail
const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
expect(failedState).toBeDefined();
expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED);
expect(failedState?.failureDetails?.message).toContain("First attempt failed!");

// Now rewind the failed orchestration
await taskHubClient.rewindInstance(instanceId, "Testing rewind functionality");

// The orchestration should now be running again
// Wait for it to complete (successfully this time)
const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
expect(rewindedState).toBeDefined();
expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);
expect(rewindedState?.serializedOutput).toContain("Success on attempt");
});

it.skip("should rewind a failed orchestration with a descriptive reason (requires backend support)", async () => {
const instanceId = `rewind-reason-${Date.now()}`;
const rewindReason = "Rewinding due to transient network failure";

taskHubWorker.addOrchestrator(failOnceOrchestrator);
await startWorker();

// Schedule and wait for failure
await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId);
const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED);

// Rewind with a specific reason
await taskHubClient.rewindInstance(instanceId, rewindReason);

// Verify it can complete after rewind
const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);
});
});

describe("rewindInstance - negative cases", () => {
// A simple orchestrator that completes successfully
const simpleOrchestrator: TOrchestrator = async (ctx: OrchestrationContext, input: number) => {
return input * 2;
};

// An orchestrator that waits for an event (stays in Running state)
const waitingOrchestrator: TOrchestrator = async (ctx: OrchestrationContext) => {
const approval = await ctx.waitForExternalEvent("approval");
return `Approved: ${approval}`;
};

it("should throw an error when rewinding a non-existent instance (or if rewind is not supported)", async () => {
const nonExistentId = `non-existent-${Date.now()}-${Math.random().toString(36).substring(7)}`;

// No need to start worker for this test
// Will throw either "not found" or "not supported" depending on backend
await expect(taskHubClient.rewindInstance(nonExistentId, "Test rewind")).rejects.toThrow();
});

it("should throw an error when rewinding a completed orchestration (or if rewind is not supported)", async () => {
const instanceId = `rewind-completed-${Date.now()}`;

taskHubWorker.addOrchestrator(simpleOrchestrator);
await startWorker();

// Schedule and wait for completion
await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, 5, instanceId);
const state = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
expect(state?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);

// Try to rewind a completed orchestration - should fail
await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow();
});

it.skip("should throw an error when rewinding a running orchestration (requires backend support)", async () => {
const instanceId = `rewind-running-${Date.now()}`;

taskHubWorker.addOrchestrator(waitingOrchestrator);
await startWorker();

// Schedule the orchestration (will be waiting for event)
await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId);

// Wait for it to start running
await taskHubClient.waitForOrchestrationStart(instanceId, false, 30);

// Try to rewind a running orchestration - should fail
try {
await taskHubClient.rewindInstance(instanceId, "Test rewind");
// If we get here, the operation didn't throw - which might be expected on some backends
} catch (e) {
expect((e as Error).message).toMatch(/not allow|precondition|running/i);
} finally {
// Clean up: terminate the waiting orchestration
await taskHubClient.terminateOrchestration(instanceId, "Test cleanup");
await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30);
}
});

it.skip("should throw an error when rewinding a terminated orchestration (requires backend support)", async () => {
const instanceId = `rewind-terminated-${Date.now()}`;

taskHubWorker.addOrchestrator(waitingOrchestrator);
await startWorker();

// Schedule the orchestration
await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId);
await taskHubClient.waitForOrchestrationStart(instanceId, false, 30);

// Terminate it
await taskHubClient.terminateOrchestration(instanceId, "Terminating for test");
const terminatedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30);
expect(terminatedState?.runtimeStatus).toBe(OrchestrationStatus.TERMINATED);

// Try to rewind a terminated orchestration - should fail
await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow();
});

it("should throw an error when instanceId is empty", async () => {
await expect(taskHubClient.rewindInstance("", "Test rewind")).rejects.toThrow("instanceId is required");
});
});
});
Loading