From a1e209880b6698cd8a232415965044064c315929 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 21 Jan 2026 14:29:56 -0800 Subject: [PATCH 01/21] initial commit --- .../main/java/com/functions/RewindTest.java | 89 ++++++++++++ .../io/durabletask/samples/RewindPattern.java | 131 ++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 endtoendtests/src/main/java/com/functions/RewindTest.java create mode 100644 samples/src/main/java/io/durabletask/samples/RewindPattern.java diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java new file mode 100644 index 00000000..71844027 --- /dev/null +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -0,0 +1,89 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample functions to test the rewind functionality. + * Rewind allows a failed orchestration to be replayed from its last known good state. + */ +public class RewindTest { + + // Flag to control whether the activity should fail (first call fails, subsequent calls succeed) + private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + + /** + * HTTP trigger to start the rewindable orchestration. + */ + @FunctionName("StartRewindableOrchestration") + public HttpResponseMessage startRewindableOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting rewindable orchestration."); + + // Reset the failure flag so the first activity call will fail + shouldFail.set(true); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Orchestration that calls an activity which will fail on the first attempt. + * After rewinding, the orchestration will replay and the activity will succeed. + */ + @FunctionName("RewindableOrchestration") + public String rewindableOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // Call the activity that may fail + String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await(); + return result; + } + + /** + * Activity that fails on the first call but succeeds on subsequent calls. + * This simulates a transient failure that can be recovered by rewinding. + */ + @FunctionName("FailOnceActivity") + public String failOnceActivity( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + if (shouldFail.compareAndSet(true, false)) { + context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input); + throw new RuntimeException("Simulated transient failure - rewind to retry"); + } + context.getLogger().info("FailOnceActivity: Success for input: " + input); + return input + "-rewound-success"; + } + + /** + * HTTP trigger to reset the failure flag (useful for testing). + */ + @FunctionName("ResetRewindFailureFlag") + public HttpResponseMessage resetRewindFailureFlag( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + final ExecutionContext context) { + shouldFail.set(true); + context.getLogger().info("Reset failure flag to true."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) + .body("Failure flag reset to true") + .build(); + } +} diff --git a/samples/src/main/java/io/durabletask/samples/RewindPattern.java b/samples/src/main/java/io/durabletask/samples/RewindPattern.java new file mode 100644 index 00000000..7b2b9775 --- /dev/null +++ b/samples/src/main/java/io/durabletask/samples/RewindPattern.java @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.durabletask.samples; + +import com.microsoft.durabletask.*; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample demonstrating the rewind functionality. + * + * Rewind allows a failed orchestration to be replayed from its last known good state. + * This is useful for recovering from transient failures without losing progress. + * + * This sample: + * 1. Starts an orchestration that calls an activity which fails on the first attempt + * 2. Waits for the orchestration to fail + * 3. Rewinds the orchestration, which replays it from the failure point + * 4. The activity succeeds on retry, and the orchestration completes + */ +final class RewindPattern { + + // Flag to simulate a transient failure (fails first time, succeeds after) + private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + + public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { + // Create and start the worker + final DurableTaskGrpcWorker worker = createTaskHubWorker(); + worker.start(); + + // Create the client + final DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + + try { + // Reset the failure flag + shouldFail.set(true); + + // Start the orchestration - it will fail on the first activity call + String instanceId = client.scheduleNewOrchestrationInstance( + "RewindableOrchestration", + new NewOrchestrationInstanceOptions().setInput("TestInput")); + System.out.printf("Started orchestration instance: %s%n", instanceId); + + // Wait for the orchestration to fail + System.out.println("Waiting for orchestration to fail..."); + OrchestrationMetadata failedInstance = client.waitForInstanceCompletion( + instanceId, + Duration.ofSeconds(30), + true); + + System.out.printf("Orchestration status: %s%n", failedInstance.getRuntimeStatus()); + + if (failedInstance.getRuntimeStatus() == OrchestrationRuntimeStatus.FAILED) { + System.out.println("Orchestration failed as expected. Now rewinding..."); + + // Rewind the failed orchestration + client.rewindInstance(instanceId, "Rewinding after transient failure"); + System.out.println("Rewind request sent."); + + // Wait for the orchestration to complete after rewind + System.out.println("Waiting for orchestration to complete after rewind..."); + OrchestrationMetadata completedInstance = client.waitForInstanceCompletion( + instanceId, + Duration.ofSeconds(30), + true); + + System.out.printf("Orchestration completed: %s%n", completedInstance.getRuntimeStatus()); + System.out.printf("Output: %s%n", completedInstance.readOutputAs(String.class)); + } else { + System.out.println("Unexpected status: " + failedInstance.getRuntimeStatus()); + } + + } finally { + // Shutdown the worker + worker.stop(); + } + } + + private static DurableTaskGrpcWorker createTaskHubWorker() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + + // Register the orchestration + builder.addOrchestration(new TaskOrchestrationFactory() { + @Override + public String getName() { + return "RewindableOrchestration"; + } + + @Override + public TaskOrchestration create() { + return ctx -> { + String input = ctx.getInput(String.class); + + // Call an activity that may fail + String result = ctx.callActivity("FailOnceActivity", input, String.class).await(); + + ctx.complete(result); + }; + } + }); + + // Register the activity that fails on first call + builder.addActivity(new TaskActivityFactory() { + @Override + public String getName() { + return "FailOnceActivity"; + } + + @Override + public TaskActivity create() { + return ctx -> { + String input = ctx.getInput(String.class); + + // Fail on the first call, succeed on subsequent calls + if (shouldFail.compareAndSet(true, false)) { + System.out.println("FailOnceActivity: Simulating transient failure..."); + throw new RuntimeException("Simulated transient failure - rewind to retry"); + } + + System.out.println("FailOnceActivity: Succeeded after rewind!"); + return input + "-rewound-success"; + }; + } + }); + + return builder.build(); + } +} From cab7522f0f4d8e8abfe5a32743d0ea2ecb38e1a8 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Sun, 25 Jan 2026 21:26:20 -0800 Subject: [PATCH 02/21] added api --- .../azurefunctions/HttpManagementPayload.java | 11 ++ .../durabletask/DurableTaskClient.java | 25 ++++ .../durabletask/DurableTaskGrpcClient.java | 10 ++ .../PROTO_SOURCE_COMMIT_HASH | 2 +- .../protos/orchestrator_service.proto | 133 ++++++++++++++++-- 5 files changed, 171 insertions(+), 10 deletions(-) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java index f78c00f9..dace60b7 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java @@ -13,6 +13,7 @@ public class HttpManagementPayload { private final String id; private final String purgeHistoryDeleteUri; private final String restartPostUri; + private final String rewindPostUri; private final String sendEventPostUri; private final String statusQueryGetUri; private final String terminatePostUri; @@ -33,6 +34,7 @@ public HttpManagementPayload( this.id = instanceId; this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters; this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters; + this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters; this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters; this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters; this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters; @@ -94,4 +96,13 @@ public String getRestartPostUri() { return restartPostUri; } + /** + * Gets the HTTP POST instance rewind endpoint. + * + * @return The HTTP URL for posting instance rewind commands. + */ + public String getRewindPostUri() { + return rewindPostUri; + } + } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 4590277f..3b2ca4be 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -292,6 +292,31 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( */ public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId); + /** + * Rewinds a failed orchestration instance to the last known good state and replays from there. + *

+ * This method can only be used on orchestration instances that are in a Failed state. + * When rewound, the orchestration instance will restart from the point of failure as if the failure + * never occurred. + * + * @param instanceId the ID of the orchestration instance to rewind + */ + public void rewindInstance(String instanceId) { + this.rewindInstance(instanceId, null); + } + + /** + * Rewinds a failed orchestration instance to the last known good state and replays from there. + *

+ * This method can only be used on orchestration instances that are in a Failed state. + * When rewound, the orchestration instance will restart from the point of failure as if the failure + * never occurred. + * + * @param instanceId the ID of the orchestration instance to rewind + * @param reason the reason for rewinding the orchestration instance + */ + public abstract void rewindInstance(String instanceId, @Nullable String reason); + /** * Suspends a running orchestration instance. * @param instanceId the ID of the orchestration instance to suspend diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 52d072b8..c9d23c55 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -331,6 +331,16 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t } } + @Override + public void rewindInstance(String instanceId, @Nullable String reason) { + RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder(); + rewindRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + rewindRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); + } + @Override public void suspendInstance(String instanceId, @Nullable String reason) { SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index 3d3f9e98..fdb90d6a 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -fbe5bb20835678099fc51a44993ed9b045dee5a6 \ No newline at end of file +026329c53fe6363985655857b9ca848ec7238bd2 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 88928c3b..8ef46a4a 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -41,6 +41,7 @@ message TaskFailureDetails { google.protobuf.StringValue stackTrace = 3; TaskFailureDetails innerFailure = 4; bool isNonRetriable = 5; + map properties = 6; } enum OrchestrationStatus { @@ -95,6 +96,7 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + map tags = 5; } message TaskCompletedEvent { @@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; TraceContext parentTraceContext = 5; + map tags = 6; } message SubOrchestrationInstanceCompletedEvent { @@ -192,7 +195,7 @@ message EntityOperationCalledEvent { } message EntityLockRequestedEvent { - string criticalSectionId = 1; + string criticalSectionId = 1; repeated string lockSet = 2; int32 position = 3; google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories @@ -217,7 +220,19 @@ message EntityUnlockSentEvent { message EntityLockGrantedEvent { string criticalSectionId = 1; } - + +message ExecutionRewoundEvent { + google.protobuf.StringValue reason = 1; + google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise + TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue name = 5; // used by DTS backend only + google.protobuf.StringValue version = 6; // used by DTS backend only + google.protobuf.StringValue input = 7; // used by DTS backend only + ParentInstanceInfo parentInstance = 8; // used by DTS backend only + map tags = 9; // used by DTS backend only +} + message HistoryEvent { int32 eventId = 1; google.protobuf.Timestamp timestamp = 2; @@ -244,11 +259,12 @@ message HistoryEvent { ExecutionResumedEvent executionResumed = 22; EntityOperationSignaledEvent entityOperationSignaled = 23; EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; EntityLockRequestedEvent entityLockRequested = 27; EntityLockGrantedEvent entityLockGranted = 28; EntityUnlockSentEvent entityUnlockSent = 29; + ExecutionRewoundEvent executionRewound = 30; } } @@ -256,6 +272,8 @@ message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + map tags = 4; + TraceContext parentTraceContext = 5; } message CreateSubOrchestrationAction { @@ -263,6 +281,8 @@ message CreateSubOrchestrationAction { string name = 2; google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; + map tags = 6; } message CreateTimerAction { @@ -282,6 +302,7 @@ message CompleteOrchestrationAction { google.protobuf.StringValue newVersion = 4; repeated HistoryEvent carryoverEvents = 5; TaskFailureDetails failureDetails = 6; + map tags = 7; } message TerminateOrchestrationAction { @@ -312,6 +333,11 @@ message OrchestratorAction { } } +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + message OrchestratorRequest { string instanceId = 1; google.protobuf.StringValue executionId = 2; @@ -320,6 +346,8 @@ message OrchestratorRequest { OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; } message OrchestratorResponse { @@ -331,6 +359,17 @@ message OrchestratorResponse { // The number of work item events that were processed by the orchestrator. // This field is optional. If not set, the service should assume that the orchestrator processed all events. google.protobuf.Int32Value numEventsProcessed = 5; + OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; + + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). + bool isPartial = 8; + + // Zero-based position of the current chunk within a chunked completion sequence. + // This field is omitted for non-chunked completions. + google.protobuf.Int32Value chunkIndex = 9; } message CreateInstanceRequest { @@ -343,6 +382,7 @@ message CreateInstanceRequest { google.protobuf.StringValue executionId = 7; map tags = 8; TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; } message OrchestrationIdReusePolicy { @@ -449,12 +489,28 @@ message QueryInstancesResponse { google.protobuf.StringValue continuationToken = 2; } +message ListInstanceIdsRequest { + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp completedTimeFrom = 2; + google.protobuf.Timestamp completedTimeTo = 3; + int32 pageSize = 4; + google.protobuf.StringValue lastInstanceKey = 5; +} + +message ListInstanceIdsResponse { + repeated string instanceIds = 1; + google.protobuf.StringValue lastInstanceKey = 2; +} + message PurgeInstancesRequest { oneof request { string instanceId = 1; PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; } bool recursive = 3; + // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity) + bool isOrchestration = 5; } message PurgeInstanceFilter { @@ -468,6 +524,15 @@ message PurgeInstancesResponse { google.protobuf.BoolValue isComplete = 2; } +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + message CreateTaskHubRequest { bool recreateIfExists = 1; } @@ -490,10 +555,12 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; } message SignalEntityResponse { - // no payload + // no payload } message GetEntityRequest { @@ -553,6 +620,7 @@ message EntityBatchRequest { string instanceId = 1; google.protobuf.StringValue entityState = 2; repeated OperationRequest operations = 3; + map properties = 4; } message EntityBatchResult { @@ -562,6 +630,8 @@ message EntityBatchResult { TaskFailureDetails failureDetails = 4; string completionToken = 5; repeated OperationInfo operationInfos = 6; // used only with DTS + // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided. + bool requiresState = 7; } message EntityRequest { @@ -575,6 +645,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; } message OperationResult { @@ -591,10 +662,14 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationAction { @@ -610,6 +685,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -618,6 +695,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } message AbandonActivityTaskRequest { @@ -644,6 +723,17 @@ message AbandonEntityTaskResponse { // Empty. } +message SkipGracefulOrchestrationTerminationsRequest { + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) + repeated string unterminatedInstanceIds = 1; +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -657,18 +747,21 @@ service TaskHubSidecarService { // Rewinds an orchestration instance to last known good state and replays from there. rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + // Waits for an orchestration instance to reach a running or completion state. rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); // Raises an event to a running orchestration instance. rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - + // Terminates a running orchestration instance. rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - + // Suspends a running orchestration instance. rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); @@ -678,6 +771,9 @@ service TaskHubSidecarService { // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + + rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); @@ -714,6 +810,10 @@ service TaskHubSidecarService { // Abandon an entity work item rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); } message GetWorkItemsRequest { @@ -732,6 +832,16 @@ enum WorkerCapability { // When set, the service may return work items without any history events as an optimization. // It is strongly recommended that all SDKs support this capability. WORKER_CAPABILITY_HISTORY_STREAMING = 1; + + // Indicates that the worker supports scheduled tasks. + // The service may send schedule-triggered orchestration work items, + // and the worker must handle them, including the scheduledTime field. + WORKER_CAPABILITY_SCHEDULED_TASKS = 2; + + // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage). + // Work items may contain URI references instead of inline data, and the worker must fetch them. + // This avoids message size limits and reduces network overhead. + WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } message WorkItem { @@ -750,7 +860,7 @@ message CompleteTaskResponse { } message HealthPing { - // No payload + // No payload } message StreamInstanceHistoryRequest { @@ -764,3 +874,8 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; } + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; +} From 5a804bc0f62605e54896059bbdf648bad2575c28 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 26 Jan 2026 09:31:54 -0800 Subject: [PATCH 03/21] updated EndToEndTests.java --- .../java/com/functions/EndToEndTests.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index c2d0be02..de824e36 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -4,6 +4,7 @@ import io.restassured.http.ContentType; import io.restassured.path.json.JsonPath; import io.restassured.response.Response; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -26,6 +27,14 @@ @Tag("e2e") public class EndToEndTests { + @BeforeAll + public static void setup() { + RestAssured.baseURI = "http://localhost"; + // Use port 8080 for Docker, 7071 for local func start + String port = System.getenv("FUNCTIONS_PORT"); + RestAssured.port = port != null ? Integer.parseInt(port) : 8080; + } + @Order(1) @Test public void setupHost() { @@ -216,6 +225,40 @@ public void suspendResume() throws InterruptedException { assertTrue(completed); } + @Test + public void rewindFailedOrchestration() throws InterruptedException { + // Reset the failure flag before starting + post("/api/ResetRewindFailureFlag"); + + // Start the orchestration - it will fail on the first activity call + String startOrchestrationPath = "/api/StartRewindableOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + // Wait for the orchestration to fail + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(failed, "Orchestration should have failed"); + + // Get the rewind URI and rewind the orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); + Response rewindResponse = post(rewindPostUri); + assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); + + // Wait for the orchestration to complete after rewind + Set continueStates = new HashSet<>(); + continueStates.add("Pending"); + continueStates.add("Running"); + boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); + assertTrue(completed, "Orchestration should complete after rewind"); + + // Verify the output contains the expected result + Response statusResponse = get(statusQueryGetUri); + String output = statusResponse.jsonPath().get("output"); + assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); + } + @Test public void externalEventDeserializeFail() throws InterruptedException { String startOrchestrationPath = "api/ExternalEventHttp"; From 132532d49adba427a7721b919d0d5fcc4f1916e7 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 26 Jan 2026 09:38:00 -0800 Subject: [PATCH 04/21] updated CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87977e6f..e14c2bcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## Unreleased +* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)) + ## v1.6.2 * Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249)) From f73c058ece6b2efab10f50715a11f4f3e9a5348c Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 27 Jan 2026 17:42:46 -0800 Subject: [PATCH 05/21] updated build-validation.yml --- .github/workflows/build-validation.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 8fd6d57c..04937c9e 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -88,7 +88,11 @@ jobs: done echo "Durable Task Emulator is ready on port 4001" # Give additional time for gRPC service to fully initialize after port is open - sleep 5 + echo "Waiting for gRPC service to fully initialize..." + sleep 15 + echo "Verifying emulator container is still healthy..." + docker ps | grep durabletask-emulator + echo "Emulator initialization complete" - name: Integration Tests with Gradle run: ./gradlew integrationTest || echo "TEST_FAILED=true" >> $GITHUB_ENV From 46cc8641efa23ed0aaaeb92aab2970a1446cfaca Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Thu, 29 Jan 2026 14:16:26 -0800 Subject: [PATCH 06/21] reverted build-validation.yml changes --- .github/workflows/build-validation.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index fe04d984..9aad2d10 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -89,11 +89,7 @@ jobs: done echo "Durable Task Emulator is ready on port 4001" # Give additional time for gRPC service to fully initialize after port is open - echo "Waiting for gRPC service to fully initialize..." - sleep 15 - echo "Verifying emulator container is still healthy..." - docker ps | grep durabletask-emulator - echo "Emulator initialization complete" + sleep 5 - name: Integration Tests with Gradle run: ./gradlew integrationTest || echo "TEST_FAILED=true" >> $GITHUB_ENV From 2389289626ad7555bb946b332668e5040e89c2f0 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 4 Feb 2026 15:50:08 -0800 Subject: [PATCH 07/21] updated test report paths --- .github/workflows/build-validation.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 9aad2d10..7e0c3ef5 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -156,8 +156,8 @@ jobs: - name: Archive test report uses: actions/upload-artifact@v4 with: - name: Integration test report - path: client/build/reports/tests/endToEndTest + name: E2E test report + path: endtoendtests/build/reports/tests/endToEndTest functions-sample-tests: @@ -195,5 +195,5 @@ jobs: - name: Archive test report uses: actions/upload-artifact@v4 with: - name: Integration test report - path: client/build/reports/tests/endToEndTest \ No newline at end of file + name: Functions Sample test report + path: samples-azure-functions/build/reports/tests/sampleTest \ No newline at end of file From 8e5c920409b7d1d6c8156d0ecb52d4f1a4f75e44 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Thu, 5 Feb 2026 09:27:03 -0800 Subject: [PATCH 08/21] added tests --- .../durabletask/IntegrationTests.java | 108 ++++++++++++++++++ .../main/java/com/functions/RewindTest.java | 89 +++++++++++++++ .../java/com/functions/EndToEndTests.java | 34 ++++++ 3 files changed, 231 insertions(+) create mode 100644 samples-azure-functions/src/main/java/com/functions/RewindTest.java diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 59db0b7f..2b29e958 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -605,6 +605,114 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti } } + @Test + void rewindFailedOrchestration() throws TimeoutException { + final String orchestratorName = "RewindOrchestration"; + final String activityName = "FailOnceActivity"; + final AtomicBoolean shouldFail = new AtomicBoolean(true); + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String result = ctx.callActivity(activityName, null, String.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> { + if (shouldFail.compareAndSet(true, false)) { + throw new RuntimeException("Simulated transient failure"); + } + return "Success after rewind"; + }) + .buildAndStart(); + + DurableTaskClient client = this.createClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + + // Wait for the orchestration to fail + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + + // Rewind the failed orchestration with a reason + String rewindReason = "Rewinding after transient failure"; + client.rewindInstance(instanceId, rewindReason); + + // Wait for the orchestration to complete after rewind + instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("Success after rewind", instance.readOutputAs(String.class)); + } + } + + @Test + void rewindFailedOrchestrationWithoutReason() throws TimeoutException { + final String orchestratorName = "RewindOrchestrationNoReason"; + final String activityName = "FailOnceActivityNoReason"; + final AtomicBoolean shouldFail = new AtomicBoolean(true); + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String result = ctx.callActivity(activityName, null, String.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> { + if (shouldFail.compareAndSet(true, false)) { + throw new RuntimeException("Simulated transient failure"); + } + return "Success after rewind without reason"; + }) + .buildAndStart(); + + DurableTaskClient client = this.createClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + + // Wait for the orchestration to fail + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + + // Rewind the failed orchestration without providing a reason + client.rewindInstance(instanceId); + + // Wait for the orchestration to complete after rewind + instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("Success after rewind without reason", instance.readOutputAs(String.class)); + } + } + + @Test + void rewindCompletedOrchestrationThrowsException() throws TimeoutException { + final String orchestratorName = "RewindCompletedOrchestration"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + ctx.complete("Completed successfully"); + }) + .buildAndStart(); + + DurableTaskClient client = this.createClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + + // Wait for the orchestration to complete + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Attempt to rewind a completed orchestration - should throw or be a no-op + // Based on API behavior, rewind is only valid for FAILED orchestrations + assertThrows( + Exception.class, + () -> client.rewindInstance(instanceId, "Attempting to rewind completed orchestration"), + "Rewinding a completed orchestration should throw an exception" + ); + } + } + @Test void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; diff --git a/samples-azure-functions/src/main/java/com/functions/RewindTest.java b/samples-azure-functions/src/main/java/com/functions/RewindTest.java new file mode 100644 index 00000000..8c998fa4 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/RewindTest.java @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpStatus; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample functions to test the rewind functionality. + * Rewind allows a failed orchestration to be replayed from its last known good state. + */ +public class RewindTest { + + // Flag to control whether the activity should fail (first call fails, subsequent calls succeed) + private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + + /** + * HTTP trigger to start the rewindable orchestration. + */ + @FunctionName("StartRewindableOrchestration") + public HttpResponseMessage startRewindableOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting rewindable orchestration."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Orchestration that calls an activity which will fail on the first attempt. + * After rewinding, the orchestration will replay and the activity will succeed. + */ + @FunctionName("RewindableOrchestration") + public String rewindableOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // Call the activity that may fail + String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await(); + return result; + } + + /** + * Activity that fails on the first call but succeeds on subsequent calls. + * This simulates a transient failure that can be recovered by rewinding. + */ + @FunctionName("FailOnceActivity") + public String failOnceActivity( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + if (shouldFail.compareAndSet(true, false)) { + context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input); + throw new RuntimeException("Simulated transient failure - rewind to retry"); + } + context.getLogger().info("FailOnceActivity: Success for input: " + input); + return input + "-rewound-success"; + } + + /** + * HTTP trigger to reset the failure flag (useful for testing). + */ + @FunctionName("ResetRewindFailureFlag") + public HttpResponseMessage resetRewindFailureFlag( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + final ExecutionContext context) { + shouldFail.set(true); + context.getLogger().info("Reset failure flag to true."); + return request.createResponseBuilder(HttpStatus.OK) + .body("Failure flag reset to true") + .build(); + } +} diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index a756b5db..371fc9f9 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -229,6 +229,40 @@ public void orchestrationPOJO() throws InterruptedException { assertEquals("\"TESTNAME\"", outputName); } + @Test + public void rewindFailedOrchestration() throws InterruptedException { + // Reset the failure flag before starting + post("/api/ResetRewindFailureFlag"); + + // Start the orchestration - it will fail on the first activity call + String startOrchestrationPath = "/api/StartRewindableOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + // Wait for the orchestration to fail + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(failed, "Orchestration should have failed"); + + // Get the rewind URI and rewind the orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); + Response rewindResponse = post(rewindPostUri); + assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); + + // Wait for the orchestration to complete after rewind + Set continueStates = new HashSet<>(); + continueStates.add("Pending"); + continueStates.add("Running"); + boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); + assertTrue(completed, "Orchestration should complete after rewind"); + + // Verify the output contains the expected result + Response statusResponse = get(statusQueryGetUri); + String output = statusResponse.jsonPath().get("output"); + assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); + } + private boolean pollingCheck(String statusQueryGetUri, String expectedState, Set continueStates, From 7cec5d1eb249ca36ec58abf3487935d974b7523a Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 17 Feb 2026 15:25:37 -0800 Subject: [PATCH 09/21] addressed PR feedback --- .../durabletask/DurableTaskClient.java | 3 +- .../durabletask/IntegrationTests.java | 115 +++++++++++------- .../java/com/functions/EndToEndTests.java | 1 - 3 files changed, 73 insertions(+), 46 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 3b2ca4be..fe8b6804 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -297,7 +297,8 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( *

* This method can only be used on orchestration instances that are in a Failed state. * When rewound, the orchestration instance will restart from the point of failure as if the failure - * never occurred. + * never occurred. It rewinds the orchestration by replaying any + * Failed Activities and Failed suborchestrations that themselves have Failed Activities * * @param instanceId the ID of the orchestration instance to rewind */ diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 2b29e958..637252c0 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -413,6 +413,40 @@ void subOrchestration() throws TimeoutException { } } + @Test + void subOrchestrationWithFailedActivity() throws TimeoutException { + final String parentOrchestratorName = "ParentOrchestration"; + final String childOrchestratorName = "ChildOrchestration"; + final String activityName = "FailingActivity"; + final String failureMessage = "Simulated activity failure in sub-orchestration"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(parentOrchestratorName, ctx -> { + String result = ctx.callSubOrchestrator(childOrchestratorName, null, String.class).await(); + ctx.complete(result); + }) + .addOrchestrator(childOrchestratorName, ctx -> { + String result = ctx.callActivity(activityName, null, String.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> { + throw new RuntimeException(failureMessage); + }) + .buildAndStart(); + + DurableTaskClient client = this.createClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + + FailureDetails details = instance.getFailureDetails(); + assertNotNull(details); + assertTrue(details.getErrorMessage().contains(failureMessage)); + } + } + @Test void continueAsNew() throws TimeoutException { final String orchestratorName = "continueAsNew"; @@ -605,10 +639,10 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti } } - @Test - void rewindFailedOrchestration() throws TimeoutException { - final String orchestratorName = "RewindOrchestration"; - final String activityName = "FailOnceActivity"; + private void rewindFailedOrchestrationHelper(String rewindReason) throws TimeoutException { + final String orchestratorName = "RewindOrchestration" + (rewindReason == null ? "NoReason" : ""); + final String activityName = "FailOnceActivity" + (rewindReason == null ? "NoReason" : ""); + final String expectedOutput = "Success after rewind" + (rewindReason == null ? " without reason" : ""); final AtomicBoolean shouldFail = new AtomicBoolean(true); DurableTaskGrpcWorker worker = this.createWorkerBuilder() @@ -620,7 +654,7 @@ void rewindFailedOrchestration() throws TimeoutException { if (shouldFail.compareAndSet(true, false)) { throw new RuntimeException("Simulated transient failure"); } - return "Success after rewind"; + return expectedOutput; }) .buildAndStart(); @@ -633,55 +667,29 @@ void rewindFailedOrchestration() throws TimeoutException { assertNotNull(instance); assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - // Rewind the failed orchestration with a reason - String rewindReason = "Rewinding after transient failure"; - client.rewindInstance(instanceId, rewindReason); + // Rewind the failed orchestration + if (rewindReason != null) { + client.rewindInstance(instanceId, rewindReason); + } else { + client.rewindInstance(instanceId); + } // Wait for the orchestration to complete after rewind instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); assertNotNull(instance); assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals("Success after rewind", instance.readOutputAs(String.class)); + assertEquals(expectedOutput, instance.readOutputAs(String.class)); } } @Test - void rewindFailedOrchestrationWithoutReason() throws TimeoutException { - final String orchestratorName = "RewindOrchestrationNoReason"; - final String activityName = "FailOnceActivityNoReason"; - final AtomicBoolean shouldFail = new AtomicBoolean(true); - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> { - String result = ctx.callActivity(activityName, null, String.class).await(); - ctx.complete(result); - }) - .addActivity(activityName, ctx -> { - if (shouldFail.compareAndSet(true, false)) { - throw new RuntimeException("Simulated transient failure"); - } - return "Success after rewind without reason"; - }) - .buildAndStart(); - - DurableTaskClient client = this.createClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - - // Wait for the orchestration to fail - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - - // Rewind the failed orchestration without providing a reason - client.rewindInstance(instanceId); + void rewindFailedOrchestration() throws TimeoutException { + rewindFailedOrchestrationHelper("Rewinding after transient failure"); + } - // Wait for the orchestration to complete after rewind - instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals("Success after rewind without reason", instance.readOutputAs(String.class)); - } + @Test + void rewindFailedOrchestrationWithoutReason() throws TimeoutException { + rewindFailedOrchestrationHelper(null); } @Test @@ -713,6 +721,25 @@ void rewindCompletedOrchestrationThrowsException() throws TimeoutException { } } + @Test + void rewindNonExistentOrchestrationThrowsException() { + final String orchestratorName = "RewindNonExistent"; + final String nonExistentId = "non-existent-instance-id"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.complete("done")) + .buildAndStart(); + + DurableTaskClient client = this.createClientBuilder().build(); + try (worker; client) { + assertThrows( + Exception.class, + () -> client.rewindInstance(nonExistentId, "Attempting to rewind non-existent orchestration"), + "Rewinding a non-existent orchestration should throw an exception" + ); + } + } + @Test void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index de824e36..98c84a96 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -248,7 +248,6 @@ public void rewindFailedOrchestration() throws InterruptedException { // Wait for the orchestration to complete after rewind Set continueStates = new HashSet<>(); - continueStates.add("Pending"); continueStates.add("Running"); boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); assertTrue(completed, "Orchestration should complete after rewind"); From c3c5e450dce54bf86ff789a2ded9d84904f2b59d Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 17 Feb 2026 19:20:58 -0800 Subject: [PATCH 10/21] addressed copilot comments --- .../microsoft/durabletask/DurableTaskGrpcClient.java | 1 + .../src/test/java/com/functions/EndToEndTests.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 3275a04e..1827afb8 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -345,6 +345,7 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t @Override public void rewindInstance(String instanceId, @Nullable String reason) { + Helpers.throwIfArgumentNull(instanceId, "instanceId"); RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder(); rewindRequestBuilder.setInstanceId(instanceId); if (reason != null) { diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index 98c84a96..53a0e454 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -32,7 +32,16 @@ public static void setup() { RestAssured.baseURI = "http://localhost"; // Use port 8080 for Docker, 7071 for local func start String port = System.getenv("FUNCTIONS_PORT"); - RestAssured.port = port != null ? Integer.parseInt(port) : 8080; + if (port != null) { + try { + RestAssured.port = Integer.parseInt(port); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "FUNCTIONS_PORT environment variable must be a valid integer, but was: '" + port + "'", e); + } + } else { + RestAssured.port = 8080; + } } @Order(1) From cdefa979a98c3ac1c5392ee2340c045c8f0d1b3e Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 18 Feb 2026 22:58:45 -0800 Subject: [PATCH 11/21] addressed PR feedback --- .../durabletask/DurableTaskClient.java | 3 ++- .../durabletask/IntegrationTests.java | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index fe8b6804..809d7e2c 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -311,7 +311,8 @@ public void rewindInstance(String instanceId) { *

* This method can only be used on orchestration instances that are in a Failed state. * When rewound, the orchestration instance will restart from the point of failure as if the failure - * never occurred. + * never occurred. It rewinds the orchestration by replaying any + * Failed Activities and Failed suborchestrations that themselves have Failed Activities * * @param instanceId the ID of the orchestration instance to rewind * @param reason the reason for rewinding the orchestration instance diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 637252c0..5ff63705 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -419,6 +419,8 @@ void subOrchestrationWithFailedActivity() throws TimeoutException { final String childOrchestratorName = "ChildOrchestration"; final String activityName = "FailingActivity"; final String failureMessage = "Simulated activity failure in sub-orchestration"; + final String expectedOutput = "Success after rewind"; + final AtomicBoolean shouldFail = new AtomicBoolean(true); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(parentOrchestratorName, ctx -> { @@ -430,13 +432,18 @@ void subOrchestrationWithFailedActivity() throws TimeoutException { ctx.complete(result); }) .addActivity(activityName, ctx -> { - throw new RuntimeException(failureMessage); + if (shouldFail.compareAndSet(true, false)) { + throw new RuntimeException(failureMessage); + } + return expectedOutput; }) .buildAndStart(); DurableTaskClient client = this.createClientBuilder().build(); try (worker; client) { String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName); + + // Wait for the orchestration to fail due to the activity failure in the sub-orchestration OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); assertNotNull(instance); assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); @@ -444,6 +451,15 @@ void subOrchestrationWithFailedActivity() throws TimeoutException { FailureDetails details = instance.getFailureDetails(); assertNotNull(details); assertTrue(details.getErrorMessage().contains(failureMessage)); + + // Rewind the failed orchestration + client.rewindInstance(instanceId, "Rewinding sub-orchestration with failed activity"); + + // Wait for the orchestration to complete after rewind + instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(expectedOutput, instance.readOutputAs(String.class)); } } From f17d006c9ffb25eb78b981cb00c328adfb434af6 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 23 Feb 2026 21:00:52 -0800 Subject: [PATCH 12/21] Remove rewind tests from standalone SDK and keep Functions e2e tests --- CHANGELOG.md | 2 +- .../durabletask/DurableTaskClient.java | 8 + .../durabletask/DurableTaskGrpcClient.java | 22 +-- .../durabletask/IntegrationTests.java | 151 ------------------ .../io/durabletask/samples/RewindPattern.java | 131 --------------- 5 files changed, 20 insertions(+), 294 deletions(-) delete mode 100644 samples/src/main/java/io/durabletask/samples/RewindPattern.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cf49999..9751d3ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## Unreleased -* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)) +* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)). Note: orchestration processing for rewind is supported with Azure Functions but not with the standalone `GrpcDurableTaskWorker`. ## v1.7.0 * Add descriptive error when orchestration type is not registered ([#261](https://github.com/microsoft/durabletask-java/pull/261)) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 809d7e2c..1e1b3cb0 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -299,6 +299,10 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( * When rewound, the orchestration instance will restart from the point of failure as if the failure * never occurred. It rewinds the orchestration by replaying any * Failed Activities and Failed suborchestrations that themselves have Failed Activities + *

+ * Note: Rewind requires a backend that supports it. When using Azure Functions with the + * Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker} + * does not currently support orchestration processing for rewind. * * @param instanceId the ID of the orchestration instance to rewind */ @@ -313,6 +317,10 @@ public void rewindInstance(String instanceId) { * When rewound, the orchestration instance will restart from the point of failure as if the failure * never occurred. It rewinds the orchestration by replaying any * Failed Activities and Failed suborchestrations that themselves have Failed Activities + *

+ * Note: Rewind requires a backend that supports it. When using Azure Functions with the + * Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker} + * does not currently support orchestration processing for rewind. * * @param instanceId the ID of the orchestration instance to rewind * @param reason the reason for rewinding the orchestration instance diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 1827afb8..f87388d9 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -343,17 +343,6 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t } } - @Override - public void rewindInstance(String instanceId, @Nullable String reason) { - Helpers.throwIfArgumentNull(instanceId, "instanceId"); - RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder(); - rewindRequestBuilder.setInstanceId(instanceId); - if (reason != null) { - rewindRequestBuilder.setReason(StringValue.of(reason)); - } - this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); - } - @Override public void suspendInstance(String instanceId, @Nullable String reason) { SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); @@ -374,6 +363,17 @@ public void resumeInstance(String instanceId, @Nullable String reason) { this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } + @Override + public void rewindInstance(String instanceId, @Nullable String reason) { + Helpers.throwIfArgumentNull(instanceId, "instanceId"); + RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder(); + rewindRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + rewindRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); + } + @Override public String restartInstance(String instanceId, boolean restartWithNewInstanceId) { OrchestrationMetadata metadata = this.getInstanceMetadata(instanceId, true); diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 5ff63705..59db0b7f 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -413,56 +413,6 @@ void subOrchestration() throws TimeoutException { } } - @Test - void subOrchestrationWithFailedActivity() throws TimeoutException { - final String parentOrchestratorName = "ParentOrchestration"; - final String childOrchestratorName = "ChildOrchestration"; - final String activityName = "FailingActivity"; - final String failureMessage = "Simulated activity failure in sub-orchestration"; - final String expectedOutput = "Success after rewind"; - final AtomicBoolean shouldFail = new AtomicBoolean(true); - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(parentOrchestratorName, ctx -> { - String result = ctx.callSubOrchestrator(childOrchestratorName, null, String.class).await(); - ctx.complete(result); - }) - .addOrchestrator(childOrchestratorName, ctx -> { - String result = ctx.callActivity(activityName, null, String.class).await(); - ctx.complete(result); - }) - .addActivity(activityName, ctx -> { - if (shouldFail.compareAndSet(true, false)) { - throw new RuntimeException(failureMessage); - } - return expectedOutput; - }) - .buildAndStart(); - - DurableTaskClient client = this.createClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName); - - // Wait for the orchestration to fail due to the activity failure in the sub-orchestration - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - - FailureDetails details = instance.getFailureDetails(); - assertNotNull(details); - assertTrue(details.getErrorMessage().contains(failureMessage)); - - // Rewind the failed orchestration - client.rewindInstance(instanceId, "Rewinding sub-orchestration with failed activity"); - - // Wait for the orchestration to complete after rewind - instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(expectedOutput, instance.readOutputAs(String.class)); - } - } - @Test void continueAsNew() throws TimeoutException { final String orchestratorName = "continueAsNew"; @@ -655,107 +605,6 @@ void terminateSuspendOrchestration() throws TimeoutException, InterruptedExcepti } } - private void rewindFailedOrchestrationHelper(String rewindReason) throws TimeoutException { - final String orchestratorName = "RewindOrchestration" + (rewindReason == null ? "NoReason" : ""); - final String activityName = "FailOnceActivity" + (rewindReason == null ? "NoReason" : ""); - final String expectedOutput = "Success after rewind" + (rewindReason == null ? " without reason" : ""); - final AtomicBoolean shouldFail = new AtomicBoolean(true); - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> { - String result = ctx.callActivity(activityName, null, String.class).await(); - ctx.complete(result); - }) - .addActivity(activityName, ctx -> { - if (shouldFail.compareAndSet(true, false)) { - throw new RuntimeException("Simulated transient failure"); - } - return expectedOutput; - }) - .buildAndStart(); - - DurableTaskClient client = this.createClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - - // Wait for the orchestration to fail - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - - // Rewind the failed orchestration - if (rewindReason != null) { - client.rewindInstance(instanceId, rewindReason); - } else { - client.rewindInstance(instanceId); - } - - // Wait for the orchestration to complete after rewind - instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(expectedOutput, instance.readOutputAs(String.class)); - } - } - - @Test - void rewindFailedOrchestration() throws TimeoutException { - rewindFailedOrchestrationHelper("Rewinding after transient failure"); - } - - @Test - void rewindFailedOrchestrationWithoutReason() throws TimeoutException { - rewindFailedOrchestrationHelper(null); - } - - @Test - void rewindCompletedOrchestrationThrowsException() throws TimeoutException { - final String orchestratorName = "RewindCompletedOrchestration"; - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> { - ctx.complete("Completed successfully"); - }) - .buildAndStart(); - - DurableTaskClient client = this.createClientBuilder().build(); - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - - // Wait for the orchestration to complete - OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - - // Attempt to rewind a completed orchestration - should throw or be a no-op - // Based on API behavior, rewind is only valid for FAILED orchestrations - assertThrows( - Exception.class, - () -> client.rewindInstance(instanceId, "Attempting to rewind completed orchestration"), - "Rewinding a completed orchestration should throw an exception" - ); - } - } - - @Test - void rewindNonExistentOrchestrationThrowsException() { - final String orchestratorName = "RewindNonExistent"; - final String nonExistentId = "non-existent-instance-id"; - - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> ctx.complete("done")) - .buildAndStart(); - - DurableTaskClient client = this.createClientBuilder().build(); - try (worker; client) { - assertThrows( - Exception.class, - () -> client.rewindInstance(nonExistentId, "Attempting to rewind non-existent orchestration"), - "Rewinding a non-existent orchestration should throw an exception" - ); - } - } - @Test void activityFanOut() throws IOException, TimeoutException { final String orchestratorName = "ActivityFanOut"; diff --git a/samples/src/main/java/io/durabletask/samples/RewindPattern.java b/samples/src/main/java/io/durabletask/samples/RewindPattern.java deleted file mode 100644 index 7b2b9775..00000000 --- a/samples/src/main/java/io/durabletask/samples/RewindPattern.java +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package io.durabletask.samples; - -import com.microsoft.durabletask.*; - -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Sample demonstrating the rewind functionality. - * - * Rewind allows a failed orchestration to be replayed from its last known good state. - * This is useful for recovering from transient failures without losing progress. - * - * This sample: - * 1. Starts an orchestration that calls an activity which fails on the first attempt - * 2. Waits for the orchestration to fail - * 3. Rewinds the orchestration, which replays it from the failure point - * 4. The activity succeeds on retry, and the orchestration completes - */ -final class RewindPattern { - - // Flag to simulate a transient failure (fails first time, succeeds after) - private static final AtomicBoolean shouldFail = new AtomicBoolean(true); - - public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - // Create and start the worker - final DurableTaskGrpcWorker worker = createTaskHubWorker(); - worker.start(); - - // Create the client - final DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); - - try { - // Reset the failure flag - shouldFail.set(true); - - // Start the orchestration - it will fail on the first activity call - String instanceId = client.scheduleNewOrchestrationInstance( - "RewindableOrchestration", - new NewOrchestrationInstanceOptions().setInput("TestInput")); - System.out.printf("Started orchestration instance: %s%n", instanceId); - - // Wait for the orchestration to fail - System.out.println("Waiting for orchestration to fail..."); - OrchestrationMetadata failedInstance = client.waitForInstanceCompletion( - instanceId, - Duration.ofSeconds(30), - true); - - System.out.printf("Orchestration status: %s%n", failedInstance.getRuntimeStatus()); - - if (failedInstance.getRuntimeStatus() == OrchestrationRuntimeStatus.FAILED) { - System.out.println("Orchestration failed as expected. Now rewinding..."); - - // Rewind the failed orchestration - client.rewindInstance(instanceId, "Rewinding after transient failure"); - System.out.println("Rewind request sent."); - - // Wait for the orchestration to complete after rewind - System.out.println("Waiting for orchestration to complete after rewind..."); - OrchestrationMetadata completedInstance = client.waitForInstanceCompletion( - instanceId, - Duration.ofSeconds(30), - true); - - System.out.printf("Orchestration completed: %s%n", completedInstance.getRuntimeStatus()); - System.out.printf("Output: %s%n", completedInstance.readOutputAs(String.class)); - } else { - System.out.println("Unexpected status: " + failedInstance.getRuntimeStatus()); - } - - } finally { - // Shutdown the worker - worker.stop(); - } - } - - private static DurableTaskGrpcWorker createTaskHubWorker() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - - // Register the orchestration - builder.addOrchestration(new TaskOrchestrationFactory() { - @Override - public String getName() { - return "RewindableOrchestration"; - } - - @Override - public TaskOrchestration create() { - return ctx -> { - String input = ctx.getInput(String.class); - - // Call an activity that may fail - String result = ctx.callActivity("FailOnceActivity", input, String.class).await(); - - ctx.complete(result); - }; - } - }); - - // Register the activity that fails on first call - builder.addActivity(new TaskActivityFactory() { - @Override - public String getName() { - return "FailOnceActivity"; - } - - @Override - public TaskActivity create() { - return ctx -> { - String input = ctx.getInput(String.class); - - // Fail on the first call, succeed on subsequent calls - if (shouldFail.compareAndSet(true, false)) { - System.out.println("FailOnceActivity: Simulating transient failure..."); - throw new RuntimeException("Simulated transient failure - rewind to retry"); - } - - System.out.println("FailOnceActivity: Succeeded after rewind!"); - return input + "-rewound-success"; - }; - } - }); - - return builder.build(); - } -} From 4f8ac2d125bce37fa054256d3d8cc1fa960912f4 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 24 Feb 2026 12:34:57 -0800 Subject: [PATCH 13/21] Fix rewind e2e test --- endtoendtests/src/test/java/com/functions/EndToEndTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index 53a0e454..91cb6aaf 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -256,8 +256,10 @@ public void rewindFailedOrchestration() throws InterruptedException { assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); // Wait for the orchestration to complete after rewind + // Include "Failed" because the rewind may not take effect immediately Set continueStates = new HashSet<>(); continueStates.add("Running"); + continueStates.add("Failed"); boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); assertTrue(completed, "Orchestration should complete after rewind"); From 7eb2567bae98fb22989c4eae2e1b005009f51407 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 24 Feb 2026 14:02:37 -0800 Subject: [PATCH 14/21] Increase rewind e2e test polling timeouts for CI reliability --- .../src/test/java/com/functions/EndToEndTests.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index 91cb6aaf..a1a11669 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -246,7 +246,7 @@ public void rewindFailedOrchestration() throws InterruptedException { String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); // Wait for the orchestration to fail - boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(15)); assertTrue(failed, "Orchestration should have failed"); // Get the rewind URI and rewind the orchestration @@ -256,11 +256,7 @@ public void rewindFailedOrchestration() throws InterruptedException { assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); // Wait for the orchestration to complete after rewind - // Include "Failed" because the rewind may not take effect immediately - Set continueStates = new HashSet<>(); - continueStates.add("Running"); - continueStates.add("Failed"); - boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); + boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(30)); assertTrue(completed, "Orchestration should complete after rewind"); // Verify the output contains the expected result From 66da59b369f70399eeaccc0b0f3b10501a4edf1c Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Tue, 24 Feb 2026 14:54:23 -0800 Subject: [PATCH 15/21] added tests --- .../main/java/com/functions/RewindTest.java | 73 +++++++++++++++++++ .../java/com/functions/EndToEndTests.java | 58 +++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 71844027..7d8fccbc 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -26,6 +26,9 @@ public class RewindTest { // Flag to control whether the activity should fail (first call fails, subsequent calls succeed) private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + // Separate flag for sub-orchestration rewind test + private static final AtomicBoolean shouldSubFail = new AtomicBoolean(true); + /** * HTTP trigger to start the rewindable orchestration. */ @@ -86,4 +89,74 @@ public HttpResponseMessage resetRewindFailureFlag( .body("Failure flag reset to true") .build(); } + + // --- Sub-orchestration rewind test functions --- + + /** + * HTTP trigger to start the parent orchestration for sub-orchestration rewind test. + */ + @FunctionName("StartRewindableSubOrchestration") + public HttpResponseMessage startRewindableSubOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting rewindable sub-orchestration test."); + + // Reset the sub failure flag so the first activity call will fail + shouldSubFail.set(true); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableParentOrchestration"); + context.getLogger().info("Created parent orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Parent orchestration that calls a sub-orchestration which may fail. + */ + @FunctionName("RewindableParentOrchestration") + public String rewindableParentOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + String result = ctx.callSubOrchestrator("RewindableChildOrchestration", "SubRewindTest", String.class).await(); + return "Parent:" + result; + } + + /** + * Sub-orchestration that calls an activity which will fail on the first attempt. + */ + @FunctionName("RewindableChildOrchestration") + public String rewindableChildOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + String result = ctx.callActivity("FailOnceSubActivity", "SubRewindTest", String.class).await(); + return result; + } + + /** + * Activity for sub-orchestration test that fails on the first call but succeeds on subsequent calls. + */ + @FunctionName("FailOnceSubActivity") + public String failOnceSubActivity( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + if (shouldSubFail.compareAndSet(true, false)) { + context.getLogger().warning("FailOnceSubActivity: Simulating failure for input: " + input); + throw new RuntimeException("Simulated sub-orchestration transient failure - rewind to retry"); + } + context.getLogger().info("FailOnceSubActivity: Success for input: " + input); + return input + "-sub-rewound-success"; + } + + /** + * HTTP trigger to reset the sub-orchestration failure flag. + */ + @FunctionName("ResetSubRewindFailureFlag") + public HttpResponseMessage resetSubRewindFailureFlag( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + final ExecutionContext context) { + shouldSubFail.set(true); + context.getLogger().info("Reset sub failure flag to true."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) + .body("Sub failure flag reset to true") + .build(); + } } diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index a1a11669..cad3f31e 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -265,6 +265,64 @@ public void rewindFailedOrchestration() throws InterruptedException { assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); } + @Test + public void rewindNonFailedOrchestration() throws InterruptedException { + // Start a normal orchestration and wait for it to complete + String startOrchestrationPath = "/api/StartOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(15)); + assertTrue(completed, "Orchestration should complete before rewind attempt"); + + // Attempt to rewind the completed (non-failed) orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind on non-failed orchestration"); + post(rewindPostUri); + + // Wait a few seconds to allow any potential state change + Thread.sleep(5000); + + // Verify the orchestration remains in Completed state (rewind should have no effect) + Response statusResponse = get(statusQueryGetUri); + String status = statusResponse.jsonPath().get("runtimeStatus"); + assertEquals("Completed", status, + "Orchestration should remain Completed after rewind attempt on a non-failed instance"); + } + + @Test + public void rewindSubOrchestrationFailure() throws InterruptedException { + // Reset the sub-orchestration failure flag before starting + post("/api/ResetSubRewindFailureFlag"); + + // Start the parent orchestration - the sub-orchestration's activity will fail + String startOrchestrationPath = "/api/StartRewindableSubOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + // Wait for the parent orchestration to fail (due to sub-orchestration failure) + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(15)); + assertTrue(failed, "Parent orchestration should have failed due to sub-orchestration failure"); + + // Rewind the parent orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind with sub-orchestration failure"); + Response rewindResponse = post(rewindPostUri); + assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); + + // Wait for the parent orchestration to complete after rewind + boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(30)); + assertTrue(completed, "Parent orchestration should complete after rewind"); + + // Verify the output contains the expected result from the sub-orchestration + Response statusResponse = get(statusQueryGetUri); + String output = statusResponse.jsonPath().get("output"); + assertTrue(output.contains("sub-rewound-success"), + "Output should indicate successful sub-orchestration rewind: " + output); + } + @Test public void externalEventDeserializeFail() throws InterruptedException { String startOrchestrationPath = "api/ExternalEventHttp"; From c6ca0e4e61c5806a624113a37ba1f4ae476be168 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 25 Feb 2026 09:56:53 -0800 Subject: [PATCH 16/21] updated tests --- .../main/java/com/functions/RewindTest.java | 80 ++++++++++++++++++- .../java/com/functions/EndToEndTests.java | 51 +++--------- .../main/java/com/functions/RewindTest.java | 23 +++++- .../java/com/functions/EndToEndTests.java | 15 +--- 4 files changed, 113 insertions(+), 56 deletions(-) diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 7d8fccbc..2cb0839f 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -8,13 +8,16 @@ import com.microsoft.azure.functions.annotation.FunctionName; import com.microsoft.azure.functions.annotation.HttpTrigger; import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.TaskOrchestrationContext; import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; import com.microsoft.durabletask.azurefunctions.DurableClientContext; import com.microsoft.durabletask.azurefunctions.DurableClientInput; import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -30,7 +33,9 @@ public class RewindTest { private static final AtomicBoolean shouldSubFail = new AtomicBoolean(true); /** - * HTTP trigger to start the rewindable orchestration. + * HTTP trigger that starts a rewindable orchestration, waits for it to fail, + * then rewinds it using client.rewindInstance(). Returns the check status response + * so the caller can poll for the orchestration to complete after the rewind. */ @FunctionName("StartRewindableOrchestration") public HttpResponseMessage startRewindableOrchestration( @@ -45,6 +50,59 @@ public HttpResponseMessage startRewindableOrchestration( DurableTaskClient client = durableContext.getClient(); String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + + // Wait for the orchestration to reach a terminal state (expected: Failed) + try { + OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), false); + context.getLogger().info("Orchestration reached terminal state: " + metadata.getRuntimeStatus()); + } catch (TimeoutException e) { + context.getLogger().severe("Orchestration did not reach terminal state in time."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.INTERNAL_SERVER_ERROR) + .body("Orchestration did not fail within the expected time.") + .build(); + } + + // Rewind the failed orchestration using the client method + client.rewindInstance(instanceId, "Testing rewind functionality"); + context.getLogger().info("Rewind request sent for instance: " + instanceId); + + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * HTTP trigger that starts a non-failing orchestration, waits for it to complete, + * then attempts to rewind it using client.rewindInstance(). Returns the check status + * response so the caller can verify the orchestration remains in the Completed state. + */ + @FunctionName("StartRewindNonFailedOrchestration") + public HttpResponseMessage startRewindNonFailedOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting non-failing orchestration for rewind test."); + + // Ensure the activity will NOT fail + shouldFail.set(false); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); + context.getLogger().info("Created orchestration with instance ID = " + instanceId); + + // Wait for the orchestration to complete successfully + try { + client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), false); + context.getLogger().info("Orchestration completed successfully."); + } catch (TimeoutException e) { + context.getLogger().severe("Orchestration did not complete in time."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.INTERNAL_SERVER_ERROR) + .body("Orchestration did not complete within the expected time.") + .build(); + } + + // Attempt to rewind the non-failed orchestration using the client method + client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); + context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); } @@ -93,7 +151,9 @@ public HttpResponseMessage resetRewindFailureFlag( // --- Sub-orchestration rewind test functions --- /** - * HTTP trigger to start the parent orchestration for sub-orchestration rewind test. + * HTTP trigger that starts a parent orchestration with a failing sub-orchestration, + * waits for it to fail, then rewinds it using client.rewindInstance(). + * Returns the check status response so the caller can poll for completion. */ @FunctionName("StartRewindableSubOrchestration") public HttpResponseMessage startRewindableSubOrchestration( @@ -108,6 +168,22 @@ public HttpResponseMessage startRewindableSubOrchestration( DurableTaskClient client = durableContext.getClient(); String instanceId = client.scheduleNewOrchestrationInstance("RewindableParentOrchestration"); context.getLogger().info("Created parent orchestration with instance ID = " + instanceId); + + // Wait for the parent orchestration to reach a terminal state (expected: Failed) + try { + OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), false); + context.getLogger().info("Parent orchestration reached terminal state: " + metadata.getRuntimeStatus()); + } catch (TimeoutException e) { + context.getLogger().severe("Parent orchestration did not reach terminal state in time."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.INTERNAL_SERVER_ERROR) + .body("Parent orchestration did not fail within the expected time.") + .build(); + } + + // Rewind the failed parent orchestration using the client method + client.rewindInstance(instanceId, "Testing rewind with sub-orchestration failure"); + context.getLogger().info("Rewind request sent for parent instance: " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); } diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index cad3f31e..aa6fdf30 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -236,26 +236,14 @@ public void suspendResume() throws InterruptedException { @Test public void rewindFailedOrchestration() throws InterruptedException { - // Reset the failure flag before starting - post("/api/ResetRewindFailureFlag"); - - // Start the orchestration - it will fail on the first activity call + // Start the orchestration - the trigger waits for failure and calls + // client.rewindInstance() internally before returning String startOrchestrationPath = "/api/StartRewindableOrchestration"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - // Wait for the orchestration to fail - boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(15)); - assertTrue(failed, "Orchestration should have failed"); - - // Get the rewind URI and rewind the orchestration - String rewindPostUri = jsonPath.get("rewindPostUri"); - rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); - Response rewindResponse = post(rewindPostUri); - assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); - - // Wait for the orchestration to complete after rewind + // The trigger already called client.rewindInstance(), so just poll for completion boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(30)); assertTrue(completed, "Orchestration should complete after rewind"); @@ -267,21 +255,14 @@ public void rewindFailedOrchestration() throws InterruptedException { @Test public void rewindNonFailedOrchestration() throws InterruptedException { - // Start a normal orchestration and wait for it to complete - String startOrchestrationPath = "/api/StartOrchestration"; + // Start a non-failing orchestration - the trigger waits for completion + // and then calls client.rewindInstance() internally before returning + String startOrchestrationPath = "/api/StartRewindNonFailedOrchestration"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(15)); - assertTrue(completed, "Orchestration should complete before rewind attempt"); - - // Attempt to rewind the completed (non-failed) orchestration - String rewindPostUri = jsonPath.get("rewindPostUri"); - rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind on non-failed orchestration"); - post(rewindPostUri); - - // Wait a few seconds to allow any potential state change + // Wait a few seconds to allow any potential state change from the rewind attempt Thread.sleep(5000); // Verify the orchestration remains in Completed state (rewind should have no effect) @@ -293,26 +274,14 @@ public void rewindNonFailedOrchestration() throws InterruptedException { @Test public void rewindSubOrchestrationFailure() throws InterruptedException { - // Reset the sub-orchestration failure flag before starting - post("/api/ResetSubRewindFailureFlag"); - - // Start the parent orchestration - the sub-orchestration's activity will fail + // Start the parent orchestration - the trigger waits for the sub-orchestration + // to fail, then calls client.rewindInstance() internally before returning String startOrchestrationPath = "/api/StartRewindableSubOrchestration"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - // Wait for the parent orchestration to fail (due to sub-orchestration failure) - boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(15)); - assertTrue(failed, "Parent orchestration should have failed due to sub-orchestration failure"); - - // Rewind the parent orchestration - String rewindPostUri = jsonPath.get("rewindPostUri"); - rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind with sub-orchestration failure"); - Response rewindResponse = post(rewindPostUri); - assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); - - // Wait for the parent orchestration to complete after rewind + // The trigger already called client.rewindInstance(), so just poll for completion boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(30)); assertTrue(completed, "Parent orchestration should complete after rewind"); diff --git a/samples-azure-functions/src/main/java/com/functions/RewindTest.java b/samples-azure-functions/src/main/java/com/functions/RewindTest.java index 8c998fa4..1965524d 100644 --- a/samples-azure-functions/src/main/java/com/functions/RewindTest.java +++ b/samples-azure-functions/src/main/java/com/functions/RewindTest.java @@ -11,13 +11,16 @@ import com.microsoft.azure.functions.annotation.FunctionName; import com.microsoft.azure.functions.annotation.HttpTrigger; import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.TaskOrchestrationContext; import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; import com.microsoft.durabletask.azurefunctions.DurableClientContext; import com.microsoft.durabletask.azurefunctions.DurableClientInput; import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -30,7 +33,9 @@ public class RewindTest { private static final AtomicBoolean shouldFail = new AtomicBoolean(true); /** - * HTTP trigger to start the rewindable orchestration. + * HTTP trigger that starts a rewindable orchestration, waits for it to fail, + * then rewinds it using client.rewindInstance(). Returns the check status response + * so the caller can poll for the orchestration to complete after the rewind. */ @FunctionName("StartRewindableOrchestration") public HttpResponseMessage startRewindableOrchestration( @@ -42,6 +47,22 @@ public HttpResponseMessage startRewindableOrchestration( DurableTaskClient client = durableContext.getClient(); String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + + // Wait for the orchestration to reach a terminal state (expected: Failed) + try { + OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), false); + context.getLogger().info("Orchestration reached terminal state: " + metadata.getRuntimeStatus()); + } catch (TimeoutException e) { + context.getLogger().severe("Orchestration did not reach terminal state in time."); + return request.createResponseBuilder(HttpStatus.INTERNAL_SERVER_ERROR) + .body("Orchestration did not fail within the expected time.") + .build(); + } + + // Rewind the failed orchestration using the client method + client.rewindInstance(instanceId, "Testing rewind functionality"); + context.getLogger().info("Rewind request sent for instance: " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); } diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index 371fc9f9..baa72e89 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -234,23 +234,14 @@ public void rewindFailedOrchestration() throws InterruptedException { // Reset the failure flag before starting post("/api/ResetRewindFailureFlag"); - // Start the orchestration - it will fail on the first activity call + // Start the orchestration - the trigger waits for failure and calls + // client.rewindInstance() internally before returning String startOrchestrationPath = "/api/StartRewindableOrchestration"; Response response = post(startOrchestrationPath); JsonPath jsonPath = response.jsonPath(); String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - // Wait for the orchestration to fail - boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); - assertTrue(failed, "Orchestration should have failed"); - - // Get the rewind URI and rewind the orchestration - String rewindPostUri = jsonPath.get("rewindPostUri"); - rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); - Response rewindResponse = post(rewindPostUri); - assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); - - // Wait for the orchestration to complete after rewind + // The trigger already called client.rewindInstance(), so just poll for completion Set continueStates = new HashSet<>(); continueStates.add("Pending"); continueStates.add("Running"); From 06b7fd7faf13f7b6d0b1e7d13c40c3b867a5dc23 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 25 Feb 2026 10:51:16 -0800 Subject: [PATCH 17/21] updated StartRewindNonFailedOrchestration test --- .../src/main/java/com/functions/RewindTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 2cb0839f..37b4c0d0 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -14,6 +14,7 @@ import com.microsoft.durabletask.azurefunctions.DurableClientContext; import com.microsoft.durabletask.azurefunctions.DurableClientInput; import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; +import io.grpc.StatusRuntimeException; import java.time.Duration; import java.util.Optional; @@ -99,9 +100,15 @@ public HttpResponseMessage startRewindNonFailedOrchestration( .build(); } - // Attempt to rewind the non-failed orchestration using the client method - client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); - context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); + // Attempt to rewind the non-failed orchestration using the client method. + // This is expected to be a no-op or may throw if the sidecar rejects it, + // so we catch any exception and still return the check status response. + try { + client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); + context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); + } catch (StatusRuntimeException e) { + context.getLogger().info("Rewind on non-failed instance was rejected (expected): " + e.getMessage()); + } return durableContext.createCheckStatusResponse(request, instanceId); } From 4bfd85a7a5b5c24a5192476a38b7bde0a70c24a0 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Wed, 25 Feb 2026 11:15:43 -0800 Subject: [PATCH 18/21] updated error handling --- .../microsoft/durabletask/DurableTaskGrpcClient.java | 10 +++++++++- .../src/main/java/com/functions/RewindTest.java | 3 +-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index f87388d9..ae3b0c1a 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -371,7 +371,15 @@ public void rewindInstance(String instanceId, @Nullable String reason) { if (reason != null) { rewindRequestBuilder.setReason(StringValue.of(reason)); } - this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); + try { + this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.FAILED_PRECONDITION) { + throw new IllegalStateException( + "Orchestration instance '" + instanceId + "' is not in a failed state and cannot be rewound.", e); + } + throw e; + } } @Override diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 37b4c0d0..6f099a90 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -14,7 +14,6 @@ import com.microsoft.durabletask.azurefunctions.DurableClientContext; import com.microsoft.durabletask.azurefunctions.DurableClientInput; import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; -import io.grpc.StatusRuntimeException; import java.time.Duration; import java.util.Optional; @@ -106,7 +105,7 @@ public HttpResponseMessage startRewindNonFailedOrchestration( try { client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); - } catch (StatusRuntimeException e) { + } catch (IllegalStateException e) { context.getLogger().info("Rewind on non-failed instance was rejected (expected): " + e.getMessage()); } From fb04187f3f552f9decb77027de8e3ec6a08daf2a Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Fri, 27 Feb 2026 09:16:13 -0800 Subject: [PATCH 19/21] added error handling for when an instance id doesn't exist --- .../durabletask/DurableTaskGrpcClient.java | 4 +++ .../main/java/com/functions/RewindTest.java | 29 +++++++++++++++++++ .../java/com/functions/EndToEndTests.java | 14 +++++++++ .../main/java/com/functions/RewindTest.java | 29 +++++++++++++++++++ .../java/com/functions/EndToEndTests.java | 14 +++++++++ 5 files changed, 90 insertions(+) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index ae3b0c1a..897c93f0 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -374,6 +374,10 @@ public void rewindInstance(String instanceId, @Nullable String reason) { try { this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { + throw new IllegalArgumentException( + "No orchestration instance with ID '" + instanceId + "' was found.", e); + } if (e.getStatus().getCode() == Status.Code.FAILED_PRECONDITION) { throw new IllegalStateException( "Orchestration instance '" + instanceId + "' is not in a failed state and cannot be rewound.", e); diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 6f099a90..318101bb 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -228,6 +228,35 @@ public String failOnceSubActivity( return input + "-sub-rewound-success"; } + /** + * HTTP trigger that attempts to rewind a non-existent orchestration instance. + * This should result in an IllegalArgumentException being thrown by the client + * when the server returns a NOT_FOUND gRPC status. + */ + @FunctionName("StartRewindNonExistentOrchestration") + public HttpResponseMessage startRewindNonExistentOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Attempting to rewind a non-existent orchestration instance."); + + DurableTaskClient client = durableContext.getClient(); + String nonExistentInstanceId = "non-existent-instance-" + System.currentTimeMillis(); + + try { + client.rewindInstance(nonExistentInstanceId, "Testing rewind on non-existent instance"); + // If we get here, the rewind did not throw as expected + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.INTERNAL_SERVER_ERROR) + .body("IllegalArgumentException was not thrown") + .build(); + } catch (IllegalArgumentException e) { + context.getLogger().info("Rewind on non-existent instance threw expected exception: " + e.getMessage()); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) + .body(e.getMessage()) + .build(); + } + } + /** * HTTP trigger to reset the sub-orchestration failure flag. */ diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index aa6fdf30..c9fb9535 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -253,6 +253,20 @@ public void rewindFailedOrchestration() throws InterruptedException { assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); } + @Test + public void rewindNonExistentOrchestration() throws InterruptedException { + // Attempt to rewind a non-existent orchestration instance. + // The trigger calls client.rewindInstance() with a fake instance ID and + // expects an IllegalArgumentException (from gRPC NOT_FOUND status). + String startOrchestrationPath = "/api/StartRewindNonExistentOrchestration"; + Response response = post(startOrchestrationPath); + assertEquals(200, response.getStatusCode(), + "Expected 200 OK indicating the IllegalArgumentException was caught. Body: " + response.getBody().asString()); + String body = response.getBody().asString(); + assertTrue(body.contains("No orchestration instance with ID") && body.contains("was found"), + "Response should contain the not-found error message, but was: " + body); + } + @Test public void rewindNonFailedOrchestration() throws InterruptedException { // Start a non-failing orchestration - the trigger waits for completion diff --git a/samples-azure-functions/src/main/java/com/functions/RewindTest.java b/samples-azure-functions/src/main/java/com/functions/RewindTest.java index 1965524d..33dacd22 100644 --- a/samples-azure-functions/src/main/java/com/functions/RewindTest.java +++ b/samples-azure-functions/src/main/java/com/functions/RewindTest.java @@ -94,6 +94,35 @@ public String failOnceActivity( return input + "-rewound-success"; } + /** + * HTTP trigger that attempts to rewind a non-existent orchestration instance. + * This should result in an IllegalArgumentException being thrown by the client + * when the server returns a NOT_FOUND gRPC status. + */ + @FunctionName("StartRewindNonExistentOrchestration") + public HttpResponseMessage startRewindNonExistentOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Attempting to rewind a non-existent orchestration instance."); + + DurableTaskClient client = durableContext.getClient(); + String nonExistentInstanceId = "non-existent-instance-" + System.currentTimeMillis(); + + try { + client.rewindInstance(nonExistentInstanceId, "Testing rewind on non-existent instance"); + // If we get here, the rewind did not throw as expected + return request.createResponseBuilder(HttpStatus.INTERNAL_SERVER_ERROR) + .body("IllegalArgumentException was not thrown") + .build(); + } catch (IllegalArgumentException e) { + context.getLogger().info("Rewind on non-existent instance threw expected exception: " + e.getMessage()); + return request.createResponseBuilder(HttpStatus.OK) + .body(e.getMessage()) + .build(); + } + } + /** * HTTP trigger to reset the failure flag (useful for testing). */ diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index baa72e89..409018ce 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -229,6 +229,20 @@ public void orchestrationPOJO() throws InterruptedException { assertEquals("\"TESTNAME\"", outputName); } + @Test + public void rewindNonExistentOrchestration() throws InterruptedException { + // Attempt to rewind a non-existent orchestration instance. + // The trigger calls client.rewindInstance() with a fake instance ID and + // expects an IllegalArgumentException (from gRPC NOT_FOUND status). + String startOrchestrationPath = "/api/StartRewindNonExistentOrchestration"; + Response response = post(startOrchestrationPath); + assertEquals(200, response.getStatusCode(), + "Expected 200 OK indicating the IllegalArgumentException was caught. Body: " + response.getBody().asString()); + String body = response.getBody().asString(); + assertTrue(body.contains("No orchestration instance with ID") && body.contains("was found"), + "Response should contain the not-found error message, but was: " + body); + } + @Test public void rewindFailedOrchestration() throws InterruptedException { // Reset the failure flag before starting From 37f27abd6a799f38e143981d0718f5a2463b6141 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Fri, 27 Feb 2026 12:00:24 -0800 Subject: [PATCH 20/21] updated comment --- endtoendtests/src/main/java/com/functions/RewindTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 318101bb..82c608f1 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -100,8 +100,8 @@ public HttpResponseMessage startRewindNonFailedOrchestration( } // Attempt to rewind the non-failed orchestration using the client method. - // This is expected to be a no-op or may throw if the sidecar rejects it, - // so we catch any exception and still return the check status response. + // The sidecar rejects this with FAILED_PRECONDITION since the instance is not + // in a Failed state. The client translates this to an IllegalStateException. try { client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); From ad5982e6f8085529a64e8fc094fa5800ff708121 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Fri, 27 Feb 2026 12:10:28 -0800 Subject: [PATCH 21/21] updated tests --- .../main/java/com/functions/RewindTest.java | 16 ++++++++++----- .../java/com/functions/EndToEndTests.java | 20 ++++++++----------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java index 82c608f1..22e8006d 100644 --- a/endtoendtests/src/main/java/com/functions/RewindTest.java +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -71,8 +71,10 @@ public HttpResponseMessage startRewindableOrchestration( /** * HTTP trigger that starts a non-failing orchestration, waits for it to complete, - * then attempts to rewind it using client.rewindInstance(). Returns the check status - * response so the caller can verify the orchestration remains in the Completed state. + * then attempts to rewind it using client.rewindInstance(). The sidecar should reject + * the rewind with FAILED_PRECONDITION (translated to IllegalStateException) since + * the instance is not in a Failed state. Returns 200 with the exception message on + * expected rejection, or 500 if the rewind unexpectedly succeeded. */ @FunctionName("StartRewindNonFailedOrchestration") public HttpResponseMessage startRewindNonFailedOrchestration( @@ -104,12 +106,16 @@ public HttpResponseMessage startRewindNonFailedOrchestration( // in a Failed state. The client translates this to an IllegalStateException. try { client.rewindInstance(instanceId, "Testing rewind on non-failed orchestration"); - context.getLogger().info("Rewind request sent for non-failed instance: " + instanceId); + // If we get here, the rewind did not throw as expected + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.INTERNAL_SERVER_ERROR) + .body("IllegalStateException was not thrown for non-failed instance") + .build(); } catch (IllegalStateException e) { context.getLogger().info("Rewind on non-failed instance was rejected (expected): " + e.getMessage()); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) + .body(e.getMessage()) + .build(); } - - return durableContext.createCheckStatusResponse(request, instanceId); } /** diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index c9fb9535..e06b71e9 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -270,20 +270,16 @@ public void rewindNonExistentOrchestration() throws InterruptedException { @Test public void rewindNonFailedOrchestration() throws InterruptedException { // Start a non-failing orchestration - the trigger waits for completion - // and then calls client.rewindInstance() internally before returning + // and then calls client.rewindInstance() internally before returning. + // The rewind should be rejected with IllegalStateException since the + // instance is not in a Failed state. String startOrchestrationPath = "/api/StartRewindNonFailedOrchestration"; Response response = post(startOrchestrationPath); - JsonPath jsonPath = response.jsonPath(); - String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); - - // Wait a few seconds to allow any potential state change from the rewind attempt - Thread.sleep(5000); - - // Verify the orchestration remains in Completed state (rewind should have no effect) - Response statusResponse = get(statusQueryGetUri); - String status = statusResponse.jsonPath().get("runtimeStatus"); - assertEquals("Completed", status, - "Orchestration should remain Completed after rewind attempt on a non-failed instance"); + assertEquals(200, response.getStatusCode(), + "Expected 200 OK indicating the IllegalStateException was caught. Body: " + response.getBody().asString()); + String body = response.getBody().asString(); + assertTrue(body.contains("is not in a failed state") && body.contains("cannot be rewound"), + "Response should contain the precondition error message, but was: " + body); } @Test