Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/durabletask-js/src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ export class TaskHubGrpcWorker {
const failureDetails = pbh.newFailureDetails(error);

res = new pb.ActivityResponse();
res.setInstanceid(instanceId);
res.setTaskid(req.getTaskid());
res.setCompletiontoken(completionToken);
res.setFailuredetails(failureDetails);
Expand Down
151 changes: 151 additions & 0 deletions packages/durabletask-js/test/worker-activity-response.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Tests that the TaskHubGrpcWorker correctly sets instanceId on ActivityResponse
* in both success and failure paths.
*
* This validates the fix for a bug where the activity failure path omitted
* the instanceId field from the ActivityResponse, deviating from the .NET SDK
* behavior and potentially causing issues with sidecar response routing.
*/

import * as pb from "../src/proto/orchestrator_service_pb";
import * as stubs from "../src/proto/orchestrator_service_grpc_pb";
import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker";
import { NoOpLogger } from "../src/types/logger.type";
import { ActivityContext } from "../src/task/context/activity-context";
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";

const TEST_INSTANCE_ID = "test-instance-123";
const TEST_TASK_ID = 42;
const COMPLETION_TOKEN = "test-completion-token";

describe("Worker Activity Response", () => {
/**
* Creates a mock gRPC stub that captures the ActivityResponse passed to
* completeActivityTask.
*/
function createMockStub(): {
stub: stubs.TaskHubSidecarServiceClient;
capturedResponse: pb.ActivityResponse | null;
} {
let capturedResponse: pb.ActivityResponse | null = null;

const stub = {
completeActivityTask: (
response: pb.ActivityResponse,
metadata: any,
callback: (err: any, res: any) => void,
) => {
capturedResponse = response;
callback(null, {});
},
} as unknown as stubs.TaskHubSidecarServiceClient;

return {
stub,
get capturedResponse() {
return capturedResponse;
},
};
}

/**
* Creates a minimal ActivityRequest for testing.
*/
function createActivityRequest(name: string, input?: string): pb.ActivityRequest {
const req = new pb.ActivityRequest();
req.setName(name);
req.setTaskid(TEST_TASK_ID);

const orchInstance = new pb.OrchestrationInstance();
orchInstance.setInstanceid(TEST_INSTANCE_ID);
req.setOrchestrationinstance(orchInstance);

if (input !== undefined) {
const inputValue = new StringValue();
inputValue.setValue(input);
req.setInput(inputValue);
}

return req;
}

it("should set instanceId on ActivityResponse when activity succeeds", async () => {
// Arrange
const worker = new TaskHubGrpcWorker({
logger: new NoOpLogger(),
});

const successActivity = (_ctx: ActivityContext) => {
return "success result";
};

worker.addActivity(successActivity);

const mockStub = createMockStub();
const req = createActivityRequest("successActivity", JSON.stringify("test-input"));

// Act - call the private method directly
await (worker as any)._executeActivityInternal(req, COMPLETION_TOKEN, mockStub.stub);

// Assert
expect(mockStub.capturedResponse).not.toBeNull();
expect(mockStub.capturedResponse!.getInstanceid()).toBe(TEST_INSTANCE_ID);
expect(mockStub.capturedResponse!.getTaskid()).toBe(TEST_TASK_ID);
expect(mockStub.capturedResponse!.getFailuredetails()).toBeUndefined();
});

it("should set instanceId on ActivityResponse when activity fails", async () => {
// Arrange
const worker = new TaskHubGrpcWorker({
logger: new NoOpLogger(),
});

const failingActivity = (_ctx: ActivityContext) => {
throw new Error("Activity execution failed");
};

worker.addActivity(failingActivity);

const mockStub = createMockStub();
const req = createActivityRequest("failingActivity");

// Act - call the private method directly
await (worker as any)._executeActivityInternal(req, COMPLETION_TOKEN, mockStub.stub);

// Assert - the key assertion: instanceId MUST be set even on failure
expect(mockStub.capturedResponse).not.toBeNull();
expect(mockStub.capturedResponse!.getInstanceid()).toBe(TEST_INSTANCE_ID);
expect(mockStub.capturedResponse!.getTaskid()).toBe(TEST_TASK_ID);
expect(mockStub.capturedResponse!.getFailuredetails()).toBeDefined();
expect(mockStub.capturedResponse!.getFailuredetails()!.getErrormessage()).toContain(
"Activity execution failed",
);
});

it("should set instanceId on ActivityResponse when activity throws non-Error", async () => {
// Arrange
const worker = new TaskHubGrpcWorker({
logger: new NoOpLogger(),
});

const throwStringActivity = (_ctx: ActivityContext) => {
throw "string error";
};

worker.addActivity(throwStringActivity);

const mockStub = createMockStub();
const req = createActivityRequest("throwStringActivity");

// Act
await (worker as any)._executeActivityInternal(req, COMPLETION_TOKEN, mockStub.stub);

// Assert
expect(mockStub.capturedResponse).not.toBeNull();
expect(mockStub.capturedResponse!.getInstanceid()).toBe(TEST_INSTANCE_ID);
expect(mockStub.capturedResponse!.getFailuredetails()).toBeDefined();
});
});
Loading