diff --git a/api/matchingservice/v1/request_response.pb.go b/api/matchingservice/v1/request_response.pb.go index 88ac1fa827..852dae0299 100644 --- a/api/matchingservice/v1/request_response.pb.go +++ b/api/matchingservice/v1/request_response.pb.go @@ -14,16 +14,17 @@ import ( v11 "go.temporal.io/api/common/v1" v112 "go.temporal.io/api/deployment/v1" v19 "go.temporal.io/api/enums/v1" + v114 "go.temporal.io/api/failure/v1" v16 "go.temporal.io/api/history/v1" v113 "go.temporal.io/api/nexus/v1" v15 "go.temporal.io/api/protocol/v1" v12 "go.temporal.io/api/query/v1" v14 "go.temporal.io/api/taskqueue/v1" - v114 "go.temporal.io/api/worker/v1" + v115 "go.temporal.io/api/worker/v1" v1 "go.temporal.io/api/workflowservice/v1" v17 "go.temporal.io/server/api/clock/v1" v110 "go.temporal.io/server/api/deployment/v1" - v115 "go.temporal.io/server/api/enums/v1" + v116 "go.temporal.io/server/api/enums/v1" v13 "go.temporal.io/server/api/history/v1" v111 "go.temporal.io/server/api/persistence/v1" v18 "go.temporal.io/server/api/taskqueue/v1" @@ -3523,6 +3524,7 @@ type DispatchNexusTaskResponse struct { // *DispatchNexusTaskResponse_HandlerError // *DispatchNexusTaskResponse_Response // *DispatchNexusTaskResponse_RequestTimeout + // *DispatchNexusTaskResponse_Failure Outcome isDispatchNexusTaskResponse_Outcome `protobuf_oneof:"outcome"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -3565,6 +3567,7 @@ func (x *DispatchNexusTaskResponse) GetOutcome() isDispatchNexusTaskResponse_Out return nil } +// Deprecated: Marked as deprecated in temporal/server/api/matchingservice/v1/request_response.proto. func (x *DispatchNexusTaskResponse) GetHandlerError() *v113.HandlerError { if x != nil { if x, ok := x.Outcome.(*DispatchNexusTaskResponse_HandlerError); ok { @@ -3592,12 +3595,23 @@ func (x *DispatchNexusTaskResponse) GetRequestTimeout() *DispatchNexusTaskRespon return nil } +func (x *DispatchNexusTaskResponse) GetFailure() *v114.Failure { + if x != nil { + if x, ok := x.Outcome.(*DispatchNexusTaskResponse_Failure); ok { + return x.Failure + } + } + return nil +} + type isDispatchNexusTaskResponse_Outcome interface { isDispatchNexusTaskResponse_Outcome() } type DispatchNexusTaskResponse_HandlerError struct { - // Set if the worker's handler failed the nexus task. + // Deprecated. Use failure field instead. + // + // Deprecated: Marked as deprecated in temporal/server/api/matchingservice/v1/request_response.proto. HandlerError *v113.HandlerError `protobuf:"bytes,1,opt,name=handler_error,json=handlerError,proto3,oneof"` } @@ -3610,12 +3624,19 @@ type DispatchNexusTaskResponse_RequestTimeout struct { RequestTimeout *DispatchNexusTaskResponse_Timeout `protobuf:"bytes,3,opt,name=request_timeout,json=requestTimeout,proto3,oneof"` } +type DispatchNexusTaskResponse_Failure struct { + // Set if the worker's handler failed the nexus task. Must contain a NexusHandlerFailureInfo object. + Failure *v114.Failure `protobuf:"bytes,4,opt,name=failure,proto3,oneof"` +} + func (*DispatchNexusTaskResponse_HandlerError) isDispatchNexusTaskResponse_Outcome() {} func (*DispatchNexusTaskResponse_Response) isDispatchNexusTaskResponse_Outcome() {} func (*DispatchNexusTaskResponse_RequestTimeout) isDispatchNexusTaskResponse_Outcome() {} +func (*DispatchNexusTaskResponse_Failure) isDispatchNexusTaskResponse_Outcome() {} + type PollNexusTaskQueueRequest struct { state protoimpl.MessageState `protogen:"open.v1"` NamespaceId string `protobuf:"bytes,1,opt,name=namespace_id,json=namespaceId,proto3" json:"namespace_id,omitempty"` @@ -4536,7 +4557,7 @@ func (x *ListWorkersRequest) GetListRequest() *v1.ListWorkersRequest { type ListWorkersResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - WorkersInfo []*v114.WorkerInfo `protobuf:"bytes,1,rep,name=workers_info,json=workersInfo,proto3" json:"workers_info,omitempty"` + WorkersInfo []*v115.WorkerInfo `protobuf:"bytes,1,rep,name=workers_info,json=workersInfo,proto3" json:"workers_info,omitempty"` NextPageToken []byte `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -4572,7 +4593,7 @@ func (*ListWorkersResponse) Descriptor() ([]byte, []int) { return file_temporal_server_api_matchingservice_v1_request_response_proto_rawDescGZIP(), []int{69} } -func (x *ListWorkersResponse) GetWorkersInfo() []*v114.WorkerInfo { +func (x *ListWorkersResponse) GetWorkersInfo() []*v115.WorkerInfo { if x != nil { return x.WorkersInfo } @@ -4747,7 +4768,7 @@ func (x *DescribeWorkerRequest) GetRequest() *v1.DescribeWorkerRequest { type DescribeWorkerResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - WorkerInfo *v114.WorkerInfo `protobuf:"bytes,1,opt,name=worker_info,json=workerInfo,proto3" json:"worker_info,omitempty"` + WorkerInfo *v115.WorkerInfo `protobuf:"bytes,1,opt,name=worker_info,json=workerInfo,proto3" json:"worker_info,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -4782,7 +4803,7 @@ func (*DescribeWorkerResponse) Descriptor() ([]byte, []int) { return file_temporal_server_api_matchingservice_v1_request_response_proto_rawDescGZIP(), []int{73} } -func (x *DescribeWorkerResponse) GetWorkerInfo() *v114.WorkerInfo { +func (x *DescribeWorkerResponse) GetWorkerInfo() *v115.WorkerInfo { if x != nil { return x.WorkerInfo } @@ -4805,7 +4826,7 @@ type UpdateFairnessStateRequest struct { NamespaceId string `protobuf:"bytes,1,opt,name=namespace_id,json=namespaceId,proto3" json:"namespace_id,omitempty"` TaskQueue string `protobuf:"bytes,2,opt,name=task_queue,json=taskQueue,proto3" json:"task_queue,omitempty"` TaskQueueType v19.TaskQueueType `protobuf:"varint,3,opt,name=task_queue_type,json=taskQueueType,proto3,enum=temporal.api.enums.v1.TaskQueueType" json:"task_queue_type,omitempty"` - FairnessState v115.FairnessState `protobuf:"varint,4,opt,name=fairness_state,json=fairnessState,proto3,enum=temporal.server.api.enums.v1.FairnessState" json:"fairness_state,omitempty"` + FairnessState v116.FairnessState `protobuf:"varint,4,opt,name=fairness_state,json=fairnessState,proto3,enum=temporal.server.api.enums.v1.FairnessState" json:"fairness_state,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -4861,11 +4882,11 @@ func (x *UpdateFairnessStateRequest) GetTaskQueueType() v19.TaskQueueType { return v19.TaskQueueType(0) } -func (x *UpdateFairnessStateRequest) GetFairnessState() v115.FairnessState { +func (x *UpdateFairnessStateRequest) GetFairnessState() v116.FairnessState { if x != nil { return x.FairnessState } - return v115.FairnessState(0) + return v116.FairnessState(0) } type UpdateFairnessStateResponse struct { @@ -5340,7 +5361,7 @@ var File_temporal_server_api_matchingservice_v1_request_response_proto protorefl const file_temporal_server_api_matchingservice_v1_request_response_proto_rawDesc = "" + "\n" + - "=temporal/server/api/matchingservice/v1/request_response.proto\x12&temporal.server.api.matchingservice.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a&temporal/api/enums/v1/task_queue.proto\x1a%temporal/api/history/v1/message.proto\x1a'temporal/api/taskqueue/v1/message.proto\x1a#temporal/api/query/v1/message.proto\x1a&temporal/api/protocol/v1/message.proto\x1a*temporal/server/api/clock/v1/message.proto\x1a/temporal/server/api/deployment/v1/message.proto\x1a,temporal/server/api/history/v1/message.proto\x1a.temporal/server/api/persistence/v1/nexus.proto\x1a4temporal/server/api/persistence/v1/task_queues.proto\x1a.temporal/server/api/taskqueue/v1/message.proto\x1a1temporal/server/api/enums/v1/fairness_state.proto\x1a6temporal/api/workflowservice/v1/request_response.proto\x1a#temporal/api/nexus/v1/message.proto\x1a$temporal/api/worker/v1/message.proto\"\xc3\x02\n" + + "=temporal/server/api/matchingservice/v1/request_response.proto\x12&temporal.server.api.matchingservice.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a&temporal/api/enums/v1/task_queue.proto\x1a%temporal/api/failure/v1/message.proto\x1a%temporal/api/history/v1/message.proto\x1a'temporal/api/taskqueue/v1/message.proto\x1a#temporal/api/query/v1/message.proto\x1a&temporal/api/protocol/v1/message.proto\x1a*temporal/server/api/clock/v1/message.proto\x1a/temporal/server/api/deployment/v1/message.proto\x1a,temporal/server/api/history/v1/message.proto\x1a.temporal/server/api/persistence/v1/nexus.proto\x1a4temporal/server/api/persistence/v1/task_queues.proto\x1a.temporal/server/api/taskqueue/v1/message.proto\x1a1temporal/server/api/enums/v1/fairness_state.proto\x1a6temporal/api/workflowservice/v1/request_response.proto\x1a#temporal/api/nexus/v1/message.proto\x1a$temporal/api/worker/v1/message.proto\"\xc3\x02\n" + "\x1cPollWorkflowTaskQueueRequest\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12\x1b\n" + "\tpoller_id\x18\x02 \x01(\tR\bpollerId\x12`\n" + @@ -5632,11 +5653,12 @@ const file_temporal_server_api_matchingservice_v1_request_response_proto_rawDesc "\n" + "task_queue\x18\x02 \x01(\v2$.temporal.api.taskqueue.v1.TaskQueueR\ttaskQueue\x128\n" + "\arequest\x18\x03 \x01(\v2\x1e.temporal.api.nexus.v1.RequestR\arequest\x12T\n" + - "\fforward_info\x18\x04 \x01(\v21.temporal.server.api.taskqueue.v1.TaskForwardInfoR\vforwardInfo\"\xb2\x02\n" + - "\x19DispatchNexusTaskResponse\x12J\n" + - "\rhandler_error\x18\x01 \x01(\v2#.temporal.api.nexus.v1.HandlerErrorH\x00R\fhandlerError\x12=\n" + + "\fforward_info\x18\x04 \x01(\v21.temporal.server.api.taskqueue.v1.TaskForwardInfoR\vforwardInfo\"\xf4\x02\n" + + "\x19DispatchNexusTaskResponse\x12N\n" + + "\rhandler_error\x18\x01 \x01(\v2#.temporal.api.nexus.v1.HandlerErrorB\x02\x18\x01H\x00R\fhandlerError\x12=\n" + "\bresponse\x18\x02 \x01(\v2\x1f.temporal.api.nexus.v1.ResponseH\x00R\bresponse\x12t\n" + - "\x0frequest_timeout\x18\x03 \x01(\v2I.temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.TimeoutH\x00R\x0erequestTimeout\x1a\t\n" + + "\x0frequest_timeout\x18\x03 \x01(\v2I.temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.TimeoutH\x00R\x0erequestTimeout\x12<\n" + + "\afailure\x18\x04 \x01(\v2 .temporal.api.failure.v1.FailureH\x00R\afailure\x1a\t\n" + "\aTimeoutB\t\n" + "\aoutcome\"\xb4\x02\n" + "\x19PollNexusTaskQueueRequest\x12!\n" + @@ -5871,23 +5893,24 @@ var file_temporal_server_api_matchingservice_v1_request_response_proto_goTypes = (*v113.Request)(nil), // 129: temporal.api.nexus.v1.Request (*v113.HandlerError)(nil), // 130: temporal.api.nexus.v1.HandlerError (*v113.Response)(nil), // 131: temporal.api.nexus.v1.Response - (*v1.PollNexusTaskQueueRequest)(nil), // 132: temporal.api.workflowservice.v1.PollNexusTaskQueueRequest - (*v1.PollNexusTaskQueueResponse)(nil), // 133: temporal.api.workflowservice.v1.PollNexusTaskQueueResponse - (*v1.RespondNexusTaskCompletedRequest)(nil), // 134: temporal.api.workflowservice.v1.RespondNexusTaskCompletedRequest - (*v1.RespondNexusTaskFailedRequest)(nil), // 135: temporal.api.workflowservice.v1.RespondNexusTaskFailedRequest - (*v111.NexusEndpointSpec)(nil), // 136: temporal.server.api.persistence.v1.NexusEndpointSpec - (*v111.NexusEndpointEntry)(nil), // 137: temporal.server.api.persistence.v1.NexusEndpointEntry - (*v1.RecordWorkerHeartbeatRequest)(nil), // 138: temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest - (*v1.ListWorkersRequest)(nil), // 139: temporal.api.workflowservice.v1.ListWorkersRequest - (*v114.WorkerInfo)(nil), // 140: temporal.api.worker.v1.WorkerInfo - (*v1.UpdateTaskQueueConfigRequest)(nil), // 141: temporal.api.workflowservice.v1.UpdateTaskQueueConfigRequest - (*v14.TaskQueueConfig)(nil), // 142: temporal.api.taskqueue.v1.TaskQueueConfig - (*v1.DescribeWorkerRequest)(nil), // 143: temporal.api.workflowservice.v1.DescribeWorkerRequest - (v115.FairnessState)(0), // 144: temporal.server.api.enums.v1.FairnessState - (*v14.TaskQueueStats)(nil), // 145: temporal.api.taskqueue.v1.TaskQueueStats - (*v18.TaskQueueVersionInfoInternal)(nil), // 146: temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal - (*v1.UpdateWorkerBuildIdCompatibilityRequest)(nil), // 147: temporal.api.workflowservice.v1.UpdateWorkerBuildIdCompatibilityRequest - (*v110.WorkerDeploymentVersionData)(nil), // 148: temporal.server.api.deployment.v1.WorkerDeploymentVersionData + (*v114.Failure)(nil), // 132: temporal.api.failure.v1.Failure + (*v1.PollNexusTaskQueueRequest)(nil), // 133: temporal.api.workflowservice.v1.PollNexusTaskQueueRequest + (*v1.PollNexusTaskQueueResponse)(nil), // 134: temporal.api.workflowservice.v1.PollNexusTaskQueueResponse + (*v1.RespondNexusTaskCompletedRequest)(nil), // 135: temporal.api.workflowservice.v1.RespondNexusTaskCompletedRequest + (*v1.RespondNexusTaskFailedRequest)(nil), // 136: temporal.api.workflowservice.v1.RespondNexusTaskFailedRequest + (*v111.NexusEndpointSpec)(nil), // 137: temporal.server.api.persistence.v1.NexusEndpointSpec + (*v111.NexusEndpointEntry)(nil), // 138: temporal.server.api.persistence.v1.NexusEndpointEntry + (*v1.RecordWorkerHeartbeatRequest)(nil), // 139: temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest + (*v1.ListWorkersRequest)(nil), // 140: temporal.api.workflowservice.v1.ListWorkersRequest + (*v115.WorkerInfo)(nil), // 141: temporal.api.worker.v1.WorkerInfo + (*v1.UpdateTaskQueueConfigRequest)(nil), // 142: temporal.api.workflowservice.v1.UpdateTaskQueueConfigRequest + (*v14.TaskQueueConfig)(nil), // 143: temporal.api.taskqueue.v1.TaskQueueConfig + (*v1.DescribeWorkerRequest)(nil), // 144: temporal.api.workflowservice.v1.DescribeWorkerRequest + (v116.FairnessState)(0), // 145: temporal.server.api.enums.v1.FairnessState + (*v14.TaskQueueStats)(nil), // 146: temporal.api.taskqueue.v1.TaskQueueStats + (*v18.TaskQueueVersionInfoInternal)(nil), // 147: temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal + (*v1.UpdateWorkerBuildIdCompatibilityRequest)(nil), // 148: temporal.api.workflowservice.v1.UpdateWorkerBuildIdCompatibilityRequest + (*v110.WorkerDeploymentVersionData)(nil), // 149: temporal.server.api.deployment.v1.WorkerDeploymentVersionData } var file_temporal_server_api_matchingservice_v1_request_response_proto_depIdxs = []int32{ 88, // 0: temporal.server.api.matchingservice.v1.PollWorkflowTaskQueueRequest.poll_request:type_name -> temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest @@ -5987,43 +6010,44 @@ var file_temporal_server_api_matchingservice_v1_request_response_proto_depIdxs = 130, // 94: temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.handler_error:type_name -> temporal.api.nexus.v1.HandlerError 131, // 95: temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.response:type_name -> temporal.api.nexus.v1.Response 87, // 96: temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.request_timeout:type_name -> temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.Timeout - 132, // 97: temporal.server.api.matchingservice.v1.PollNexusTaskQueueRequest.request:type_name -> temporal.api.workflowservice.v1.PollNexusTaskQueueRequest - 78, // 98: temporal.server.api.matchingservice.v1.PollNexusTaskQueueRequest.conditions:type_name -> temporal.server.api.matchingservice.v1.PollConditions - 133, // 99: temporal.server.api.matchingservice.v1.PollNexusTaskQueueResponse.response:type_name -> temporal.api.workflowservice.v1.PollNexusTaskQueueResponse - 93, // 100: temporal.server.api.matchingservice.v1.RespondNexusTaskCompletedRequest.task_queue:type_name -> temporal.api.taskqueue.v1.TaskQueue - 134, // 101: temporal.server.api.matchingservice.v1.RespondNexusTaskCompletedRequest.request:type_name -> temporal.api.workflowservice.v1.RespondNexusTaskCompletedRequest - 93, // 102: temporal.server.api.matchingservice.v1.RespondNexusTaskFailedRequest.task_queue:type_name -> temporal.api.taskqueue.v1.TaskQueue - 135, // 103: temporal.server.api.matchingservice.v1.RespondNexusTaskFailedRequest.request:type_name -> temporal.api.workflowservice.v1.RespondNexusTaskFailedRequest - 136, // 104: temporal.server.api.matchingservice.v1.CreateNexusEndpointRequest.spec:type_name -> temporal.server.api.persistence.v1.NexusEndpointSpec - 137, // 105: temporal.server.api.matchingservice.v1.CreateNexusEndpointResponse.entry:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry - 136, // 106: temporal.server.api.matchingservice.v1.UpdateNexusEndpointRequest.spec:type_name -> temporal.server.api.persistence.v1.NexusEndpointSpec - 137, // 107: temporal.server.api.matchingservice.v1.UpdateNexusEndpointResponse.entry:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry - 137, // 108: temporal.server.api.matchingservice.v1.ListNexusEndpointsResponse.entries:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry - 138, // 109: temporal.server.api.matchingservice.v1.RecordWorkerHeartbeatRequest.heartbeart_request:type_name -> temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest - 139, // 110: temporal.server.api.matchingservice.v1.ListWorkersRequest.list_request:type_name -> temporal.api.workflowservice.v1.ListWorkersRequest - 140, // 111: temporal.server.api.matchingservice.v1.ListWorkersResponse.workers_info:type_name -> temporal.api.worker.v1.WorkerInfo - 141, // 112: temporal.server.api.matchingservice.v1.UpdateTaskQueueConfigRequest.update_taskqueue_config:type_name -> temporal.api.workflowservice.v1.UpdateTaskQueueConfigRequest - 142, // 113: temporal.server.api.matchingservice.v1.UpdateTaskQueueConfigResponse.updated_taskqueue_config:type_name -> temporal.api.taskqueue.v1.TaskQueueConfig - 143, // 114: temporal.server.api.matchingservice.v1.DescribeWorkerRequest.request:type_name -> temporal.api.workflowservice.v1.DescribeWorkerRequest - 140, // 115: temporal.server.api.matchingservice.v1.DescribeWorkerResponse.worker_info:type_name -> temporal.api.worker.v1.WorkerInfo - 111, // 116: temporal.server.api.matchingservice.v1.UpdateFairnessStateRequest.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType - 144, // 117: temporal.server.api.matchingservice.v1.UpdateFairnessStateRequest.fairness_state:type_name -> temporal.server.api.enums.v1.FairnessState - 111, // 118: temporal.server.api.matchingservice.v1.CheckTaskQueueVersionMembershipRequest.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType - 113, // 119: temporal.server.api.matchingservice.v1.CheckTaskQueueVersionMembershipRequest.version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion - 91, // 120: temporal.server.api.matchingservice.v1.PollWorkflowTaskQueueResponse.QueriesEntry.value:type_name -> temporal.api.query.v1.WorkflowQuery - 111, // 121: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesRequest.VersionTaskQueue.type:type_name -> temporal.api.enums.v1.TaskQueueType - 111, // 122: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.type:type_name -> temporal.api.enums.v1.TaskQueueType - 145, // 123: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.stats:type_name -> temporal.api.taskqueue.v1.TaskQueueStats - 82, // 124: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.stats_by_priority_key:type_name -> temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.StatsByPriorityKeyEntry - 145, // 125: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.StatsByPriorityKeyEntry.value:type_name -> temporal.api.taskqueue.v1.TaskQueueStats - 146, // 126: temporal.server.api.matchingservice.v1.DescribeTaskQueuePartitionResponse.VersionsInfoInternalEntry.value:type_name -> temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal - 147, // 127: temporal.server.api.matchingservice.v1.UpdateWorkerBuildIdCompatibilityRequest.ApplyPublicRequest.request:type_name -> temporal.api.workflowservice.v1.UpdateWorkerBuildIdCompatibilityRequest - 148, // 128: temporal.server.api.matchingservice.v1.SyncDeploymentUserDataRequest.UpsertVersionsDataEntry.value:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersionData - 129, // [129:129] is the sub-list for method output_type - 129, // [129:129] is the sub-list for method input_type - 129, // [129:129] is the sub-list for extension type_name - 129, // [129:129] is the sub-list for extension extendee - 0, // [0:129] is the sub-list for field type_name + 132, // 97: temporal.server.api.matchingservice.v1.DispatchNexusTaskResponse.failure:type_name -> temporal.api.failure.v1.Failure + 133, // 98: temporal.server.api.matchingservice.v1.PollNexusTaskQueueRequest.request:type_name -> temporal.api.workflowservice.v1.PollNexusTaskQueueRequest + 78, // 99: temporal.server.api.matchingservice.v1.PollNexusTaskQueueRequest.conditions:type_name -> temporal.server.api.matchingservice.v1.PollConditions + 134, // 100: temporal.server.api.matchingservice.v1.PollNexusTaskQueueResponse.response:type_name -> temporal.api.workflowservice.v1.PollNexusTaskQueueResponse + 93, // 101: temporal.server.api.matchingservice.v1.RespondNexusTaskCompletedRequest.task_queue:type_name -> temporal.api.taskqueue.v1.TaskQueue + 135, // 102: temporal.server.api.matchingservice.v1.RespondNexusTaskCompletedRequest.request:type_name -> temporal.api.workflowservice.v1.RespondNexusTaskCompletedRequest + 93, // 103: temporal.server.api.matchingservice.v1.RespondNexusTaskFailedRequest.task_queue:type_name -> temporal.api.taskqueue.v1.TaskQueue + 136, // 104: temporal.server.api.matchingservice.v1.RespondNexusTaskFailedRequest.request:type_name -> temporal.api.workflowservice.v1.RespondNexusTaskFailedRequest + 137, // 105: temporal.server.api.matchingservice.v1.CreateNexusEndpointRequest.spec:type_name -> temporal.server.api.persistence.v1.NexusEndpointSpec + 138, // 106: temporal.server.api.matchingservice.v1.CreateNexusEndpointResponse.entry:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry + 137, // 107: temporal.server.api.matchingservice.v1.UpdateNexusEndpointRequest.spec:type_name -> temporal.server.api.persistence.v1.NexusEndpointSpec + 138, // 108: temporal.server.api.matchingservice.v1.UpdateNexusEndpointResponse.entry:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry + 138, // 109: temporal.server.api.matchingservice.v1.ListNexusEndpointsResponse.entries:type_name -> temporal.server.api.persistence.v1.NexusEndpointEntry + 139, // 110: temporal.server.api.matchingservice.v1.RecordWorkerHeartbeatRequest.heartbeart_request:type_name -> temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest + 140, // 111: temporal.server.api.matchingservice.v1.ListWorkersRequest.list_request:type_name -> temporal.api.workflowservice.v1.ListWorkersRequest + 141, // 112: temporal.server.api.matchingservice.v1.ListWorkersResponse.workers_info:type_name -> temporal.api.worker.v1.WorkerInfo + 142, // 113: temporal.server.api.matchingservice.v1.UpdateTaskQueueConfigRequest.update_taskqueue_config:type_name -> temporal.api.workflowservice.v1.UpdateTaskQueueConfigRequest + 143, // 114: temporal.server.api.matchingservice.v1.UpdateTaskQueueConfigResponse.updated_taskqueue_config:type_name -> temporal.api.taskqueue.v1.TaskQueueConfig + 144, // 115: temporal.server.api.matchingservice.v1.DescribeWorkerRequest.request:type_name -> temporal.api.workflowservice.v1.DescribeWorkerRequest + 141, // 116: temporal.server.api.matchingservice.v1.DescribeWorkerResponse.worker_info:type_name -> temporal.api.worker.v1.WorkerInfo + 111, // 117: temporal.server.api.matchingservice.v1.UpdateFairnessStateRequest.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType + 145, // 118: temporal.server.api.matchingservice.v1.UpdateFairnessStateRequest.fairness_state:type_name -> temporal.server.api.enums.v1.FairnessState + 111, // 119: temporal.server.api.matchingservice.v1.CheckTaskQueueVersionMembershipRequest.task_queue_type:type_name -> temporal.api.enums.v1.TaskQueueType + 113, // 120: temporal.server.api.matchingservice.v1.CheckTaskQueueVersionMembershipRequest.version:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersion + 91, // 121: temporal.server.api.matchingservice.v1.PollWorkflowTaskQueueResponse.QueriesEntry.value:type_name -> temporal.api.query.v1.WorkflowQuery + 111, // 122: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesRequest.VersionTaskQueue.type:type_name -> temporal.api.enums.v1.TaskQueueType + 111, // 123: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.type:type_name -> temporal.api.enums.v1.TaskQueueType + 146, // 124: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.stats:type_name -> temporal.api.taskqueue.v1.TaskQueueStats + 82, // 125: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.stats_by_priority_key:type_name -> temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.StatsByPriorityKeyEntry + 146, // 126: temporal.server.api.matchingservice.v1.DescribeVersionedTaskQueuesResponse.VersionTaskQueue.StatsByPriorityKeyEntry.value:type_name -> temporal.api.taskqueue.v1.TaskQueueStats + 147, // 127: temporal.server.api.matchingservice.v1.DescribeTaskQueuePartitionResponse.VersionsInfoInternalEntry.value:type_name -> temporal.server.api.taskqueue.v1.TaskQueueVersionInfoInternal + 148, // 128: temporal.server.api.matchingservice.v1.UpdateWorkerBuildIdCompatibilityRequest.ApplyPublicRequest.request:type_name -> temporal.api.workflowservice.v1.UpdateWorkerBuildIdCompatibilityRequest + 149, // 129: temporal.server.api.matchingservice.v1.SyncDeploymentUserDataRequest.UpsertVersionsDataEntry.value:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersionData + 130, // [130:130] is the sub-list for method output_type + 130, // [130:130] is the sub-list for method input_type + 130, // [130:130] is the sub-list for extension type_name + 130, // [130:130] is the sub-list for extension extendee + 0, // [0:130] is the sub-list for field type_name } func init() { file_temporal_server_api_matchingservice_v1_request_response_proto_init() } @@ -6050,6 +6074,7 @@ func file_temporal_server_api_matchingservice_v1_request_response_proto_init() { (*DispatchNexusTaskResponse_HandlerError)(nil), (*DispatchNexusTaskResponse_Response)(nil), (*DispatchNexusTaskResponse_RequestTimeout)(nil), + (*DispatchNexusTaskResponse_Failure)(nil), } type x struct{} out := protoimpl.TypeBuilder{ diff --git a/chasm/lib/callback/chasm_invocation.go b/chasm/lib/callback/chasm_invocation.go index bd6f161d61..35b7389907 100644 --- a/chasm/lib/callback/chasm_invocation.go +++ b/chasm/lib/callback/chasm_invocation.go @@ -162,7 +162,7 @@ func (c chasmInvocation) getHistoryRequest( Completion: completion, } case *nexusrpc.OperationCompletionUnsuccessful: - apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true) + apiFailure, err := commonnexus.NexusFailureToTemporalFailure(op.Failure) if err != nil { return nil, fmt.Errorf("failed to convert failure type: %v", err) } diff --git a/common/nexus/failure.go b/common/nexus/failure.go index 35522dcd52..52a6d0cf78 100644 --- a/common/nexus/failure.go +++ b/common/nexus/failure.go @@ -2,13 +2,16 @@ package nexus import ( "context" + "encoding/base64" "encoding/json" "errors" + "fmt" "net/http" "sync/atomic" "github.com/nexus-rpc/sdk-go/nexus" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" nexuspb "go.temporal.io/api/nexus/v1" "go.temporal.io/api/serviceerror" @@ -52,135 +55,263 @@ var failureTypeString = string((&failurepb.Failure{}).ProtoReflect().Descriptor( // ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure. func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure { - return nexus.Failure{ - Message: failure.GetMessage(), - Metadata: failure.GetMetadata(), - Details: failure.GetDetails(), + nf := nexus.Failure{ + Message: failure.GetMessage(), + StackTrace: failure.GetStackTrace(), + Metadata: failure.GetMetadata(), + Details: failure.GetDetails(), + } + if failure.GetCause() != nil { + cause := ProtoFailureToNexusFailure(failure.GetCause()) + nf.Cause = &cause } + return nf } // NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure. // Always returns a non-nil value. func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure { - return &nexuspb.Failure{ - Message: failure.Message, - Metadata: failure.Metadata, - Details: failure.Details, + pf := &nexuspb.Failure{ + Message: failure.Message, + Metadata: failure.Metadata, + Details: failure.Details, + StackTrace: failure.StackTrace, + } + if failure.Cause != nil { + pf.Cause = NexusFailureToProtoFailure(*failure.Cause) } + return pf +} + +type serializedOperationError struct { + State string `json:"state,omitempty"` + // Bytes as base64 encoded string. + EncodedAttributes string `json:"encodedAttributes,omitempty"` } -// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to -// the proto fullname of the temporal API Failure message. +type serializedHandlerError struct { + Type string `json:"type,omitempty"` + RetryableOverride *bool `json:"retryableOverride,omitempty"` + // Bytes as base64 encoded string. + EncodedAttributes string `json:"encodedAttributes,omitempty"` +} + +// TemporalFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to +// the proto fullname of the temporal API Failure message or the standard Nexus SDK failure types. +// Returns an error if the failure cannot be converted. // Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized // failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently. -func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) { - // Unset message so it's not serialized in the details. +func TemporalFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) { + var causep *nexus.Failure + if failure.GetCause() != nil { + var cause nexus.Failure + var err error + cause, err = TemporalFailureToNexusFailure(failure.GetCause()) + if err != nil { + return nexus.Failure{}, err + } + causep = &cause + } + + switch info := failure.GetFailureInfo().(type) { + case *failurepb.Failure_NexusSdkOperationFailureInfo: + var encodedAttributes string + if failure.EncodedAttributes != nil { + b, err := protojson.Marshal(failure.EncodedAttributes) + if err != nil { + return nexus.Failure{}, fmt.Errorf("failed to deserialize OperationError attributes: %w", err) + } + encodedAttributes = base64.StdEncoding.EncodeToString(b) + } + operationError := serializedOperationError{ + State: info.NexusSdkOperationFailureInfo.GetState(), + EncodedAttributes: encodedAttributes, + } + + details, err := json.Marshal(operationError) + if err != nil { + return nexus.Failure{}, err + } + return nexus.Failure{ + Message: failure.GetMessage(), + StackTrace: failure.GetStackTrace(), + Metadata: map[string]string{ + "type": "nexus.OperationError", + }, + Details: details, + Cause: causep, + }, nil + case *failurepb.Failure_NexusHandlerFailureInfo: + var encodedAttributes string + if failure.EncodedAttributes != nil { + b, err := protojson.Marshal(failure.EncodedAttributes) + if err != nil { + return nexus.Failure{}, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err) + } + encodedAttributes = base64.StdEncoding.EncodeToString(b) + } + var retryableOverride *bool + switch info.NexusHandlerFailureInfo.GetRetryBehavior() { + case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: + val := true + retryableOverride = &val + case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: + val := false + retryableOverride = &val + case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED: + // noop + default: + // noop + } + + handlerError := serializedHandlerError{ + Type: info.NexusHandlerFailureInfo.GetType(), + RetryableOverride: retryableOverride, + EncodedAttributes: encodedAttributes, + } + + details, err := json.Marshal(handlerError) + if err != nil { + return nexus.Failure{}, err + } + return nexus.Failure{ + Message: failure.GetMessage(), + StackTrace: failure.GetStackTrace(), + Metadata: map[string]string{ + "type": "nexus.HandlerError", + }, + Details: details, + Cause: causep, + }, nil + case *failurepb.Failure_NexusSdkFailureErrorInfo: + return nexus.Failure{ + Message: failure.GetMessage(), + StackTrace: failure.GetStackTrace(), + Metadata: info.NexusSdkFailureErrorInfo.GetMetadata(), + Details: info.NexusSdkFailureErrorInfo.GetDetails(), + Cause: causep, + }, nil + } + // Unset message and stack trace so it's not serialized in the details. var message string message, failure.Message = failure.Message, "" + var stackTrace string + stackTrace, failure.StackTrace = failure.StackTrace, "" + data, err := protojson.Marshal(failure) failure.Message = message - + failure.StackTrace = stackTrace if err != nil { return nexus.Failure{}, err } + return nexus.Failure{ - Message: failure.GetMessage(), + Message: failure.GetMessage(), + StackTrace: failure.GetStackTrace(), Metadata: map[string]string{ "type": failureTypeString, }, Details: data, + Cause: causep, }, nil } -// NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure. +// NexusFailureToTemporalFailure converts a Nexus Failure to an API proto Failure. // If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is -// reconstructed using protojson.Unmarshal on the failure details field. -func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error) { - apiFailure := &failurepb.Failure{} - - if failure.Metadata != nil && failure.Metadata["type"] == failureTypeString { - if err := protojson.Unmarshal(failure.Details, apiFailure); err != nil { - return nil, err - } - } else { - payloads, err := nexusFailureMetadataToPayloads(failure) - if err != nil { - return nil, err - } - apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{ - ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ - // Make up a type here, it's not part of the Nexus Failure spec. - Type: "NexusFailure", - Details: payloads, - NonRetryable: !retryable, - }, - } - } - // Ensure this always gets written. - apiFailure.Message = failure.Message - return apiFailure, nil -} - -func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error) { - var nexusFailure nexus.Failure - failureErr, ok := opErr.Cause.(*nexus.FailureError) - if ok { - nexusFailure = failureErr.Failure - } else if opErr.Cause != nil { - nexusFailure = nexus.Failure{Message: opErr.Cause.Error()} +// reconstructed using protojson.Unmarshal on the failure details field. Otherwise, the failure is reconstructed +// based on the known Nexus SDK failure types. +// Returns an error if the failure cannot be converted. +// nolint:revive // cognitive-complexity is high but justified to keep each case together +func NexusFailureToTemporalFailure(f nexus.Failure) (*failurepb.Failure, error) { + apiFailure := &failurepb.Failure{ + Message: f.Message, + StackTrace: f.StackTrace, } - // Canceled must be translated into a CanceledFailure to match the SDK expectation. - if opErr.State == nexus.OperationStateCanceled { - if nexusFailure.Metadata != nil && nexusFailure.Metadata["type"] == failureTypeString { - temporalFailure, err := NexusFailureToAPIFailure(nexusFailure, false) - if err != nil { + if f.Metadata != nil { + switch f.Metadata["type"] { + case failureTypeString: + if err := protojson.Unmarshal(f.Details, apiFailure); err != nil { return nil, err } - if temporalFailure.GetCanceledFailureInfo() != nil { - // We already have a CanceledFailure, use it. - return temporalFailure, nil + // Restore these fields as they are not included in the marshalled failure. + apiFailure.Message = f.Message + apiFailure.StackTrace = f.StackTrace + case "nexus.OperationError": + var se serializedOperationError + err := json.Unmarshal(f.Details, &se) + if err != nil { + return nil, fmt.Errorf("failed to deserialize OperationError: %w", err) } - // Fallback to encoding the Nexus failure into a Temporal canceled failure, we expect operations that end up - // as canceled to have a CanceledFailureInfo object. - } - payloads, err := nexusFailureMetadataToPayloads(nexusFailure) - if err != nil { - return nil, err - } - return &failurepb.Failure{ - Message: nexusFailure.Message, - FailureInfo: &failurepb.Failure_CanceledFailureInfo{ - CanceledFailureInfo: &failurepb.CanceledFailureInfo{ - Details: payloads, + apiFailure.FailureInfo = &failurepb.Failure_NexusSdkOperationFailureInfo{ + NexusSdkOperationFailureInfo: &failurepb.NexusSDKOperationFailureInfo{ + State: se.State, + }, + } + if len(se.EncodedAttributes) > 0 { + decoded, err := base64.StdEncoding.DecodeString(se.EncodedAttributes) + if err != nil { + return nil, fmt.Errorf("failed to decode base64 OperationError attributes: %w", err) + } + apiFailure.EncodedAttributes = &commonpb.Payload{} + if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil { + return nil, fmt.Errorf("failed to deserialize OperationError attributes: %w", err) + } + } + case "nexus.HandlerError": + var se serializedHandlerError + err := json.Unmarshal(f.Details, &se) + if err != nil { + return nil, fmt.Errorf("failed to deserialize HandlerError: %w", err) + } + var retryBehavior enumspb.NexusHandlerErrorRetryBehavior + if se.RetryableOverride == nil { + retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED + } else if *se.RetryableOverride { + retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE + } else { + retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE + } + apiFailure.FailureInfo = &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: se.Type, + RetryBehavior: retryBehavior, + }, + } + if len(se.EncodedAttributes) > 0 { + decoded, err := base64.StdEncoding.DecodeString(se.EncodedAttributes) + if err != nil { + return nil, fmt.Errorf("failed to decode base64 HandlerError attributes: %w", err) + } + apiFailure.EncodedAttributes = &commonpb.Payload{} + if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil { + return nil, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err) + } + } + default: + apiFailure.FailureInfo = &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Metadata: f.Metadata, + Details: f.Details, }, + } + } + } else if len(f.Details) > 0 { + apiFailure.FailureInfo = &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Details: f.Details, }, - }, nil + } } - return NexusFailureToAPIFailure(nexusFailure, false) -} - -func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads, error) { - if len(failure.Metadata) == 0 && len(failure.Details) == 0 { - return nil, nil - } - // Delete before serializing. - failure.Message = "" - data, err := json.Marshal(failure) - if err != nil { - return nil, err + if f.Cause != nil { + var err error + apiFailure.Cause, err = NexusFailureToTemporalFailure(*f.Cause) + if err != nil { + return nil, err + } } - return &commonpb.Payloads{ - Payloads: []*commonpb.Payload{ - { - Metadata: map[string][]byte{ - "encoding": []byte("json/plain"), - }, - Data: data, - }, - }, - }, err + return apiFailure, nil } // ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible. diff --git a/common/nexus/failure_test.go b/common/nexus/failure_test.go new file mode 100644 index 0000000000..7949452127 --- /dev/null +++ b/common/nexus/failure_test.go @@ -0,0 +1,301 @@ +package nexus + +import ( + "testing" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/server/common/testing/protorequire" +) + +func TestRoundTrip_ApplicationFailure(t *testing.T) { + original := &failurepb.Failure{ + Message: "application error", + StackTrace: "stack trace here", + EncodedAttributes: mustToPayload(t, "encoded"), + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: "CustomError", + NonRetryable: false, + Details: &commonpb.Payloads{ + Payloads: []*commonpb.Payload{mustToPayload(t, "encoded")}, + }, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusSDKOperationFailure_WithoutAttributes(t *testing.T) { + original := &failurepb.Failure{ + Message: "operation failed", + StackTrace: "operation stack trace", + FailureInfo: &failurepb.Failure_NexusSdkOperationFailureInfo{ + NexusSdkOperationFailureInfo: &failurepb.NexusSDKOperationFailureInfo{ + State: "failed", + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusSDKOperationFailure_WithAttributes(t *testing.T) { + original := &failurepb.Failure{ + Message: "operation failed with details", + StackTrace: "operation stack trace", + FailureInfo: &failurepb.Failure_NexusSdkOperationFailureInfo{ + NexusSdkOperationFailureInfo: &failurepb.NexusSDKOperationFailureInfo{ + State: "failed", + }, + }, + EncodedAttributes: &commonpb.Payload{ + Metadata: map[string][]byte{"encoding": []byte("json/plain")}, + Data: []byte(`{"custom":"attribute"}`), + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusHandlerFailure_Retryable(t *testing.T) { + original := &failurepb.Failure{ + Message: "handler error - retryable", + StackTrace: "handler stack trace", + FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: "CustomHandlerError", + RetryBehavior: enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusHandlerFailure_NonRetryable(t *testing.T) { + original := &failurepb.Failure{ + Message: "handler error - non-retryable", + StackTrace: "handler stack trace", + FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: "FatalHandlerError", + RetryBehavior: enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusHandlerFailure_Unspecified(t *testing.T) { + original := &failurepb.Failure{ + Message: "handler error - unspecified retry", + StackTrace: "handler stack trace", + FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: "UnspecifiedHandlerError", + RetryBehavior: enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusHandlerFailure_WithAttributes(t *testing.T) { + original := &failurepb.Failure{ + Message: "handler error with attributes", + StackTrace: "handler stack trace", + FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: "ComplexHandlerError", + RetryBehavior: enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE, + }, + }, + EncodedAttributes: mustToPayload(t, "encoded attributes"), + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusSDKFailureErrorInfo(t *testing.T) { + original := &failurepb.Failure{ + Message: "sdk failure error", + StackTrace: "sdk stack trace", + FailureInfo: &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Metadata: map[string]string{ + "custom-key": "custom-value", + "error-type": "SomeError", + }, + Details: []byte(`{"field":"value"}`), + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_WithNestedCauses(t *testing.T) { + original := &failurepb.Failure{ + Message: "top level failure", + StackTrace: "top stack trace", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: "TopLevelError", + }, + }, + Cause: &failurepb.Failure{ + Message: "middle failure", + StackTrace: "middle stack trace", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + }, + }, + Cause: &failurepb.Failure{ + Message: "root cause", + StackTrace: "root stack trace", + FailureInfo: &failurepb.Failure_ServerFailureInfo{ + ServerFailureInfo: &failurepb.ServerFailureInfo{ + NonRetryable: true, + }, + }, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_NexusOperationFailureWithNexusHandlerCause(t *testing.T) { + original := &failurepb.Failure{ + Message: "operation failed", + StackTrace: "operation stack trace", + FailureInfo: &failurepb.Failure_NexusSdkOperationFailureInfo{ + NexusSdkOperationFailureInfo: &failurepb.NexusSDKOperationFailureInfo{ + State: "failed", + }, + }, + Cause: &failurepb.Failure{ + Message: "handler caused the failure", + StackTrace: "handler stack trace", + FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ + NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ + Type: "BadRequest", + RetryBehavior: enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE, + }, + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_EmptyFailure(t *testing.T) { + original := &failurepb.Failure{ + Message: "simple message", + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_OnlyStackTrace(t *testing.T) { + original := &failurepb.Failure{ + StackTrace: "just a stack trace", + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + protorequire.ProtoEqual(t, original, converted) +} + +func TestRoundTrip_OnlyDetails(t *testing.T) { + original := &failurepb.Failure{ + FailureInfo: &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Details: []byte(`{"only":"details"}`), + }, + }, + } + + nexusFailure, err := TemporalFailureToNexusFailure(original) + require.NoError(t, err) + + converted, err := NexusFailureToTemporalFailure(nexusFailure) + require.NoError(t, err) + protorequire.ProtoEqual(t, original, converted) +} diff --git a/common/nexus/nexusrpc/client.go b/common/nexus/nexusrpc/client.go index 126c1f4815..355ee89d49 100644 --- a/common/nexus/nexusrpc/client.go +++ b/common/nexus/nexusrpc/client.go @@ -281,21 +281,36 @@ func (c *HTTPClient) StartOperation( Links: links, }, nil case statusOperationFailed: - state, err := getUnsuccessfulStateFromHeader(response, body) + failure, err := c.failureFromResponse(response, body) if err != nil { return nil, err } - failure, err := c.failureFromResponse(response, body) + wireErr, err := c.options.FailureConverter.FailureToError(failure) if err != nil { return nil, err } - failureErr := c.options.FailureConverter.FailureToError(failure) - return nil, &nexus.OperationError{ - State: state, - Cause: failureErr, + // For compatibility with older servers. + if _, ok := wireErr.(*nexus.OperationError); !ok { + state, err := getUnsuccessfulStateFromHeader(response, body) + if err != nil { + return nil, err + } + opErr := &nexus.OperationError{ + State: state, + Message: "operation failed", + Cause: wireErr, + } + originalFailure, err := c.options.FailureConverter.ErrorToFailure(wireErr) + if err != nil { + return nil, err + } + opErr.OriginalFailure = &originalFailure + wireErr = opErr } + + return nil, wireErr default: return nil, c.bestEffortHandlerErrorFromResponse(response, body) } @@ -355,90 +370,69 @@ func (c *HTTPClient) failureFromResponse(response *http.Response, body []byte) ( return failure, err } -func (c *HTTPClient) failureFromResponseOrDefault(response *http.Response, body []byte, defaultMessage string) nexus.Failure { - failure, err := c.failureFromResponse(response, body) +func (c *HTTPClient) defaultErrorFromResponse(response *http.Response, body []byte, cause error) error { + errorType, err := httpStatusCodeToHandlerErrorType(response) if err != nil { - failure.Message = defaultMessage + // TODO(bergundy): optimization - use the provided cause, it's already a deserialized failure. + return newUnexpectedResponseError(err.Error(), response, body) + } + statusText := strings.TrimPrefix(response.Status, fmt.Sprintf("%d ", response.StatusCode)) + handlerErr := &nexus.HandlerError{ + Type: errorType, + Message: statusText, + // For compatibility with older servers. + RetryBehavior: retryBehaviorFromHeader(response.Header), + Cause: cause, + } + originalFailure, err := c.options.FailureConverter.ErrorToFailure(handlerErr) + if err != nil { + return newUnexpectedResponseError("failed to construct handler error from response: "+err.Error(), response, body) } - return failure + handlerErr.OriginalFailure = &originalFailure + return handlerErr } -func (c *HTTPClient) failureErrorFromResponseOrDefault(response *http.Response, body []byte, defaultMessage string) error { - failure := c.failureFromResponseOrDefault(response, body, defaultMessage) - failureErr := c.options.FailureConverter.FailureToError(failure) - return failureErr +func (c *HTTPClient) bestEffortHandlerErrorFromResponse(response *http.Response, body []byte) error { + failure, err := c.failureFromResponse(response, body) + if err != nil { + return c.defaultErrorFromResponse(response, body, nil) + } + convErr, err := c.options.FailureConverter.FailureToError(failure) + if err != nil { + return newUnexpectedResponseError(fmt.Sprintf("failed to convert Failure to error: %s", err.Error()), response, body) + } + if _, ok := convErr.(*nexus.HandlerError); !ok { + convErr = c.defaultErrorFromResponse(response, body, convErr) + } + return convErr } -func (c *HTTPClient) bestEffortHandlerErrorFromResponse(response *http.Response, body []byte) error { +func httpStatusCodeToHandlerErrorType(response *http.Response) (nexus.HandlerErrorType, error) { switch response.StatusCode { case http.StatusBadRequest: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeBadRequest, - Cause: c.failureErrorFromResponseOrDefault(response, body, "bad request"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } - case http.StatusUnauthorized: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeUnauthenticated, - Cause: c.failureErrorFromResponseOrDefault(response, body, "unauthenticated"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeBadRequest, nil case http.StatusRequestTimeout: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeRequestTimeout, - Cause: c.failureErrorFromResponseOrDefault(response, body, "request timeout"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeRequestTimeout, nil case http.StatusConflict: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeConflict, - Cause: c.failureErrorFromResponseOrDefault(response, body, "conflict"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeConflict, nil + case http.StatusUnauthorized: + return nexus.HandlerErrorTypeUnauthenticated, nil case http.StatusForbidden: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeUnauthorized, - Cause: c.failureErrorFromResponseOrDefault(response, body, "unauthorized"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeUnauthorized, nil case http.StatusNotFound: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeNotFound, - Cause: c.failureErrorFromResponseOrDefault(response, body, "not found"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeNotFound, nil case http.StatusTooManyRequests: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeResourceExhausted, - Cause: c.failureErrorFromResponseOrDefault(response, body, "resource exhausted"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeResourceExhausted, nil case http.StatusInternalServerError: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeInternal, - Cause: c.failureErrorFromResponseOrDefault(response, body, "internal error"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeInternal, nil case http.StatusNotImplemented: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeNotImplemented, - Cause: c.failureErrorFromResponseOrDefault(response, body, "not implemented"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeNotImplemented, nil case http.StatusServiceUnavailable: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeUnavailable, - Cause: c.failureErrorFromResponseOrDefault(response, body, "unavailable"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeUnavailable, nil case nexus.StatusUpstreamTimeout: - return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeUpstreamTimeout, - Cause: c.failureErrorFromResponseOrDefault(response, body, "upstream timeout"), - RetryBehavior: retryBehaviorFromHeader(response.Header), - } + return nexus.HandlerErrorTypeUpstreamTimeout, nil default: - return newUnexpectedResponseError(fmt.Sprintf("unexpected response status: %q", response.Status), response, body) + return "", fmt.Errorf("unexpected response status: %q", response.Status) } } diff --git a/common/nexus/nexusrpc/completion.go b/common/nexus/nexusrpc/completion.go index 9c5a49df64..591c4c234e 100644 --- a/common/nexus/nexusrpc/completion.go +++ b/common/nexus/nexusrpc/completion.go @@ -165,10 +165,13 @@ type OperationCompletionUnsuccessful struct { type OperationCompletionUnsuccessfulOptions struct { // A [FailureConverter] to convert a [Failure] instance to and from an [error]. Defaults to // [DefaultFailureConverter]. + // + // NOTE: To call server versions <= 1.31.0, use a FailureConverter that unwraps the error cause if message is not + // present. FailureConverter nexus.FailureConverter // OperationID is the unique ID for this operation. Used when a completion callback is received before a started response. // - // Deprecated: Use OperatonToken instead. + // Deprecated: Use OperationToken instead. OperationID string // OperationToken is the unique token for this operation. Used when a completion callback is received before a // started response. @@ -186,7 +189,10 @@ func NewOperationCompletionUnsuccessful(opErr *nexus.OperationError, options Ope if options.FailureConverter == nil { options.FailureConverter = nexus.DefaultFailureConverter() } - failure := options.FailureConverter.ErrorToFailure(opErr.Cause) + failure, err := options.FailureConverter.ErrorToFailure(opErr) + if err != nil { + return nil, err + } return &OperationCompletionUnsuccessful{ Header: make(nexus.Header), @@ -206,6 +212,8 @@ func (c *OperationCompletionUnsuccessful) applyToHTTPRequest(request *http.Reque if c.Header != nil { addNexusHeaderToHTTPHeader(c.Header, request.Header) } + + // Set the operation state header for backwards compatibility. request.Header.Set(headerOperationState, string(c.State)) request.Header.Set("Content-Type", contentTypeJSON) @@ -249,7 +257,7 @@ type CompletionRequest struct { // Links are used to link back to the operation when a completion callback is received before a started response. Links []nexus.Link // Parsed from request and set if State is failed or canceled. - Error error + Error *nexus.OperationError // Extracted from request and set if State is succeeded. Result *nexus.LazyValue } @@ -322,7 +330,27 @@ func (h *completionHTTPHandler) ServeHTTP(writer http.ResponseWriter, request *h h.writeFailure(writer, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "failed to read Failure from request body")) return } - completion.Error = h.failureConverter.FailureToError(failure) + completionErr, err := h.failureConverter.FailureToError(failure) + if err != nil { + h.writeFailure(writer, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "failed to decode failure from request body")) + return + } + opErr, ok := completionErr.(*nexus.OperationError) + if !ok { + // Backwards compatibility: wrap non-OperationError errors in an OperationError with the appropriate state. + completion.Error = &nexus.OperationError{ + State: completion.State, + Cause: completionErr, + } + originalFailure, err := h.failureConverter.ErrorToFailure(completion.Error) + if err != nil { + h.writeFailure(writer, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "failed to decode failure from request body")) + return + } + completion.Error.OriginalFailure = &originalFailure + } else { + completion.Error = opErr + } case nexus.OperationStateSucceeded: completion.Result = nexus.NewLazyValue( h.options.Serializer, diff --git a/common/nexus/nexusrpc/completion_test.go b/common/nexus/nexusrpc/completion_test.go index 4436634430..a566705749 100644 --- a/common/nexus/nexusrpc/completion_test.go +++ b/common/nexus/nexusrpc/completion_test.go @@ -178,10 +178,10 @@ func TestFailureCompletion(t *testing.T) { ctx, callbackURL, teardown := setupForCompletion(t, &failureExpectingCompletionHandler{ errorChecker: func(err error) error { - if err.Error() != "expected message" { - return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid failure: %v", err) + if opErr, ok := err.(*nexus.OperationError); ok && opErr.Message == "expected message" { + return nil } - return nil + return nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid failure: %v", err) }, expectedStartTime: startTime, expectedCloseTime: closeTime, diff --git a/common/nexus/nexusrpc/server.go b/common/nexus/nexusrpc/server.go index a4b16ae893..9119c1a21e 100644 --- a/common/nexus/nexusrpc/server.go +++ b/common/nexus/nexusrpc/server.go @@ -91,6 +91,7 @@ func (h *httpHandler) writeResult(writer http.ResponseWriter, result any) { func (h *baseHTTPHandler) writeFailure(writer http.ResponseWriter, err error) { var failure nexus.Failure + var failureError *nexus.FailureError var opError *nexus.OperationError var handlerError *nexus.HandlerError var operationState nexus.OperationState @@ -98,7 +99,13 @@ func (h *baseHTTPHandler) writeFailure(writer http.ResponseWriter, err error) { if errors.As(err, &opError) { operationState = opError.State - failure = h.failureConverter.ErrorToFailure(opError.Cause) + var convErr error + failure, convErr = h.failureConverter.ErrorToFailure(opError) + if convErr != nil { + h.logger.Error("failed to convert operation error to failure", "error", convErr) + writer.WriteHeader(http.StatusInternalServerError) + return + } statusCode = statusOperationFailed if operationState != nexus.OperationStateFailed && operationState != nexus.OperationStateCanceled { @@ -108,7 +115,13 @@ func (h *baseHTTPHandler) writeFailure(writer http.ResponseWriter, err error) { } writer.Header().Set(headerOperationState, string(operationState)) } else if errors.As(err, &handlerError) { - failure = h.failureConverter.ErrorToFailure(handlerError.Cause) + var convErr error + failure, convErr = h.failureConverter.ErrorToFailure(handlerError) + if convErr != nil { + h.logger.Error("failed to convert handler error to failure", "error", convErr) + writer.WriteHeader(http.StatusInternalServerError) + return + } switch handlerError.Type { case nexus.HandlerErrorTypeBadRequest: statusCode = http.StatusBadRequest @@ -135,6 +148,8 @@ func (h *baseHTTPHandler) writeFailure(writer http.ResponseWriter, err error) { default: h.logger.Error("unexpected handler error type", "type", handlerError.Type) } + } else if errors.As(err, &failureError) { + failure = failureError.Failure } else { failure = nexus.Failure{ Message: "internal server error", diff --git a/common/nexus/nexusrpc/setup_test.go b/common/nexus/nexusrpc/setup_test.go index 63c65e42b6..b910cff02e 100644 --- a/common/nexus/nexusrpc/setup_test.go +++ b/common/nexus/nexusrpc/setup_test.go @@ -112,19 +112,20 @@ type customFailureConverter struct{} var errCustom = errors.New("custom") // ErrorToFailure implements FailureConverter. -func (c customFailureConverter) ErrorToFailure(err error) nexus.Failure { +func (c customFailureConverter) ErrorToFailure(err error) (nexus.Failure, error) { return nexus.Failure{ Message: err.Error(), Metadata: map[string]string{ "type": "custom", }, - } + }, nil } // FailureToError implements FailureConverter. -func (c customFailureConverter) FailureToError(f nexus.Failure) error { +// nolint:revive // unnamed results of the same type is fine for test +func (c customFailureConverter) FailureToError(f nexus.Failure) (error, error) { if f.Metadata["type"] != "custom" { - return errors.New(f.Message) + return errors.New(f.Message), nil } - return fmt.Errorf("%w: %s", errCustom, f.Message) + return fmt.Errorf("%w: %s", errCustom, f.Message), nil } diff --git a/components/callbacks/chasm_invocation.go b/components/callbacks/chasm_invocation.go index 87484e2c4c..6b30ed404b 100644 --- a/components/callbacks/chasm_invocation.go +++ b/components/callbacks/chasm_invocation.go @@ -114,7 +114,7 @@ func (c chasmInvocation) getHistoryRequest( Completion: completion, } case *nexusrpc.OperationCompletionUnsuccessful: - apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true) + apiFailure, err := commonnexus.NexusFailureToTemporalFailure(op.Failure) if err != nil { return nil, fmt.Errorf("failed to convert failure type: %v", err) } diff --git a/components/nexusoperations/completion.go b/components/nexusoperations/completion.go index 90d59bbecf..bd55314fa9 100644 --- a/components/nexusoperations/completion.go +++ b/components/nexusoperations/completion.go @@ -8,6 +8,7 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" commonnexus "go.temporal.io/server/common/nexus" @@ -43,25 +44,42 @@ func handleSuccessfulOperationResult( func handleOperationError( node *hsm.Node, operation Operation, - opFailedError *nexus.OperationError, + opErr *nexus.OperationError, ) error { eventID, err := hsm.EventIDFromToken(operation.ScheduledEventToken) if err != nil { return err } - failure, err := commonnexus.OperationErrorToTemporalFailure(opFailedError) + var originalFailure *failurepb.Failure + + // This should never be nil, but better be defensive here than to panic. + if opErr.OriginalFailure == nil { + if opErr.Message == "" { + // Add a generic message to ensure the failure converter does not unwrap the failure for compatibility. + opErr.Message = "nexus operation completed unsuccessfully" + } + originalNexusFailure, err := nexus.DefaultFailureConverter().ErrorToFailure(opErr) + if err != nil { + return serviceerror.NewInvalidArgumentf("Malformed failure: %v", err) + } + opErr.OriginalFailure = &originalNexusFailure + } + + nexusFailure := opErr.OriginalFailure + originalFailure, err = commonnexus.NexusFailureToTemporalFailure(*nexusFailure) if err != nil { - return err + return serviceerror.NewInvalidArgumentf("Malformed failure: %v", err) } - switch opFailedError.State { // nolint:exhaustive + switch opErr.State { // nolint:exhaustive case nexus.OperationStateFailed: event := node.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_FAILED, func(e *historypb.HistoryEvent) { + failure := convertToNexusOperationFailure(operation, eventID, originalFailure) // We must assign to this property, linter doesn't like this. // nolint:revive e.Attributes = &historypb.HistoryEvent_NexusOperationFailedEventAttributes{ NexusOperationFailedEventAttributes: &historypb.NexusOperationFailedEventAttributes{ - Failure: nexusOperationFailure(operation, eventID, failure), + Failure: failure, ScheduledEventId: eventID, RequestId: operation.RequestId, }, @@ -70,12 +88,24 @@ func handleOperationError( return FailedEventDefinition{}.Apply(node.Parent, event) case nexus.OperationStateCanceled: + if originalFailure.GetCause().GetCanceledFailureInfo() == nil { + // Wrap the original failure in a CanceledFailureInfo to indicate cancellation. All workflow commands expected a + // nested CanceledFailure. + originalFailure.Cause = &failurepb.Failure{ + FailureInfo: &failurepb.Failure_CanceledFailureInfo{ + CanceledFailureInfo: &failurepb.CanceledFailureInfo{}, + }, + // TODO(bergundy): This might be confusing. + // Preserve the original cause. + Cause: originalFailure.GetCause(), + } + } event := node.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCELED, func(e *historypb.HistoryEvent) { // We must assign to this property, linter doesn't like this. // nolint:revive e.Attributes = &historypb.HistoryEvent_NexusOperationCanceledEventAttributes{ NexusOperationCanceledEventAttributes: &historypb.NexusOperationCanceledEventAttributes{ - Failure: nexusOperationFailure(operation, eventID, failure), + Failure: convertToNexusOperationFailure(operation, eventID, originalFailure), ScheduledEventId: eventID, RequestId: operation.RequestId, }, @@ -86,7 +116,7 @@ func handleOperationError( default: // Both the Nexus Client and CompletionHandler reject invalid states, but just in case, we return this as a // transition error. - return fmt.Errorf("unexpected operation state: %v", opFailedError.State) + return fmt.Errorf("unexpected operation state: %v", opErr.State) } } diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 222a5a07e9..f4888aeeec 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -436,11 +436,11 @@ func (e taskExecutor) saveResult(ctx context.Context, env hsm.Environment, ref h func (e taskExecutor) handleStartOperationError(env hsm.Environment, node *hsm.Node, operation Operation, callErr error) error { var handlerErr *nexus.HandlerError - var opFailedErr *nexus.OperationError + var opErr *nexus.OperationError switch { - case errors.As(callErr, &opFailedErr): - return handleOperationError(node, operation, opFailedErr) + case errors.As(callErr, &opErr): + return handleOperationError(node, operation, opErr) case errors.As(callErr, &handlerErr) && !handlerErr.Retryable(): // The StartOperation request got an unexpected response that is not retryable, fail the operation. // Although Failure is nullable, Nexus SDK is expected to always populate this field @@ -482,15 +482,17 @@ func handleNonRetryableStartOperationError(node *hsm.Node, operation Operation, if err != nil { return err } - failure, err := callErrToFailure(callErr, true) + failure, err := callErrToFailure(callErr, false) if err != nil { return err } attrs := &historypb.NexusOperationFailedEventAttributes{ - Failure: nexusOperationFailure( + Failure: convertToNexusOperationFailure( operation, eventID, - failure, + &failurepb.Failure{ + Cause: failure, + }, ), ScheduledEventId: eventID, RequestId: operation.RequestId, @@ -527,14 +529,16 @@ func (e taskExecutor) recordOperationTimeout(node *hsm.Node) error { // nolint:revive // We must mutate here even if the linter doesn't like it. e.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{ NexusOperationTimedOutEventAttributes: &historypb.NexusOperationTimedOutEventAttributes{ - Failure: nexusOperationFailure( + Failure: convertToNexusOperationFailure( op, eventID, &failurepb.Failure{ - Message: "operation timed out", - FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ - TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ - TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + Cause: &failurepb.Failure{ + Message: "operation timed out", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + }, }, }, }, @@ -780,9 +784,13 @@ func (e taskExecutor) lookupEndpoint(ctx context.Context, namespaceID namespace. return entry, nil } -func nexusOperationFailure(operation Operation, scheduledEventID int64, cause *failurepb.Failure) *failurepb.Failure { - return &failurepb.Failure{ - Message: "nexus operation completed unsuccessfully", +// Copy over message and stack trace if present since to preserve as much as possible from the original operation +// error. +func convertToNexusOperationFailure(operation Operation, scheduledEventID int64, originalFailure *failurepb.Failure) *failurepb.Failure { + f := &failurepb.Failure{ + Message: originalFailure.GetMessage(), + StackTrace: originalFailure.GetStackTrace(), + EncodedAttributes: originalFailure.GetEncodedAttributes(), FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ NexusOperationExecutionFailureInfo: &failurepb.NexusOperationFailureInfo{ Endpoint: operation.Endpoint, @@ -794,8 +802,12 @@ func nexusOperationFailure(operation Operation, scheduledEventID int64, cause *f ScheduledEventId: scheduledEventID, }, }, - Cause: cause, + Cause: originalFailure.GetCause(), + } + if originalFailure.GetMessage() == "" { + f.Message = "nexus operation completed unsuccessfully" } + return f } func startCallOutcomeTag(callCtx context.Context, result *nexusrpc.ClientStartOperationResponse[*nexus.LazyValue], callErr error) string { @@ -863,44 +875,25 @@ func isDestinationDown(err error) bool { func callErrToFailure(callErr error, retryable bool) (*failurepb.Failure, error) { var handlerErr *nexus.HandlerError if errors.As(callErr, &handlerErr) { - var retryBehavior enumspb.NexusHandlerErrorRetryBehavior - // nolint:exhaustive // unspecified is the default - switch handlerErr.RetryBehavior { - case nexus.HandlerErrorRetryBehaviorRetryable: - retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE - case nexus.HandlerErrorRetryBehaviorNonRetryable: - retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE - } - failure := &failurepb.Failure{ - Message: handlerErr.Error(), - FailureInfo: &failurepb.Failure_NexusHandlerFailureInfo{ - NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{ - Type: string(handlerErr.Type), - RetryBehavior: retryBehavior, - }, - }, - } - var failureError *nexus.FailureError - if errors.As(handlerErr.Cause, &failureError) { + var nf nexus.Failure + if handlerErr.OriginalFailure != nil { + nf = *handlerErr.OriginalFailure + } else { var err error - failure.Cause, err = commonnexus.NexusFailureToAPIFailure(failureError.Failure, retryable) + // Ensure the error message is set to ensure the failure converter does not unwrap the cause. + if handlerErr.Message == "" { + handlerErr.Message = "handler error" + } + nf, err = nexus.DefaultFailureConverter().ErrorToFailure(handlerErr) if err != nil { return nil, err } - } else { - cause := handlerErr.Cause - if cause == nil { - cause = errors.New("unknown cause") - } - failure.Cause = &failurepb.Failure{ - Message: cause.Error(), - FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ - ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{}, - }, - } } - - return failure, nil + f, err := commonnexus.NexusFailureToTemporalFailure(nf) + if err != nil { + return nil, err + } + return f, nil } return &failurepb.Failure{ diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 95dcb3f585..5c70d036f9 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -187,9 +187,10 @@ func TestProcessInvocationTask(t *testing.T) { destinationDown: false, onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { return nil, &nexus.OperationError{ - State: nexus.OperationStateFailed, + State: nexus.OperationStateFailed, + Message: "operation failed from handler", Cause: &nexus.FailureError{ - Failure: nexus.Failure{Message: "operation failed from handler", Metadata: map[string]string{"encoding": "json/plain"}, Details: json.RawMessage("\"details\"")}, + Failure: nexus.Failure{Message: "cause", Metadata: map[string]string{"encoding": "json/plain"}, Details: json.RawMessage("\"details\"")}, }, } }, @@ -202,7 +203,7 @@ func TestProcessInvocationTask(t *testing.T) { ScheduledEventId: 1, RequestId: op.RequestId, Failure: &failurepb.Failure{ - Message: "nexus operation completed unsuccessfully", + Message: "operation failed from handler", FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ NexusOperationExecutionFailureInfo: &failurepb.NexusOperationFailureInfo{ ScheduledEventId: 1, @@ -212,16 +213,11 @@ func TestProcessInvocationTask(t *testing.T) { }, }, Cause: &failurepb.Failure{ - Message: "operation failed from handler", - FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ - ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ - Type: "NexusFailure", - Details: &commonpb.Payloads{ - Payloads: []*commonpb.Payload{ - mustToPayload(t, nexus.Failure{Metadata: map[string]string{"encoding": "json/plain"}, Details: []byte(`"details"`)}), - }, - }, - NonRetryable: true, + Message: "cause", + FailureInfo: &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Metadata: map[string]string{"encoding": "json/plain"}, + Details: []byte(`"details"`), }, }, }, @@ -236,9 +232,10 @@ func TestProcessInvocationTask(t *testing.T) { destinationDown: false, onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { return nil, &nexus.OperationError{ - State: nexus.OperationStateCanceled, + State: nexus.OperationStateCanceled, + Message: "operation canceled from handler", Cause: &nexus.FailureError{ - Failure: nexus.Failure{Message: "operation canceled from handler", Metadata: map[string]string{"encoding": "json/plain"}, Details: json.RawMessage("\"details\"")}, + Failure: nexus.Failure{Message: "cause", Metadata: map[string]string{"encoding": "json/plain"}, Details: json.RawMessage("\"details\"")}, }, } }, @@ -251,7 +248,7 @@ func TestProcessInvocationTask(t *testing.T) { ScheduledEventId: 1, RequestId: op.RequestId, Failure: &failurepb.Failure{ - Message: "nexus operation completed unsuccessfully", + Message: "operation canceled from handler", FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ NexusOperationExecutionFailureInfo: &failurepb.NexusOperationFailureInfo{ ScheduledEventId: 1, @@ -261,13 +258,15 @@ func TestProcessInvocationTask(t *testing.T) { }, }, Cause: &failurepb.Failure{ - Message: "operation canceled from handler", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ - CanceledFailureInfo: &failurepb.CanceledFailureInfo{ - Details: &commonpb.Payloads{ - Payloads: []*commonpb.Payload{ - mustToPayload(t, nexus.Failure{Metadata: map[string]string{"encoding": "json/plain"}, Details: []byte(`"details"`)}), - }, + CanceledFailureInfo: &failurepb.CanceledFailureInfo{}, + }, + Cause: &failurepb.Failure{ + Message: "cause", + FailureInfo: &failurepb.Failure_NexusSdkFailureErrorInfo{ + NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{ + Metadata: map[string]string{"encoding": "json/plain"}, + Details: []byte(`"details"`), }, }, }, @@ -287,8 +286,8 @@ func TestProcessInvocationTask(t *testing.T) { expectedMetricOutcome: "handler-error:INTERNAL", checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) - require.NotNil(t, op.LastAttemptFailure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (INTERNAL): internal server error", op.LastAttemptFailure.Message) + require.Equal(t, string(nexus.HandlerErrorTypeInternal), op.LastAttemptFailure.GetNexusHandlerFailureInfo().GetType()) + require.Equal(t, "internal server error", op.LastAttemptFailure.Message) require.Equal(t, 0, len(events)) }, }, @@ -351,8 +350,8 @@ func TestProcessInvocationTask(t *testing.T) { require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_FAILED, op.State()) require.Equal(t, 1, len(events)) failure := events[0].GetNexusOperationFailedEventAttributes().Failure.Cause - require.NotNil(t, failure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (NOT_FOUND): endpoint not registered", failure.Message) + require.Equal(t, string(nexus.HandlerErrorTypeNotFound), failure.GetNexusHandlerFailureInfo().GetType()) + require.Equal(t, "endpoint not registered", failure.Message) }, }, { @@ -365,10 +364,8 @@ func TestProcessInvocationTask(t *testing.T) { require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_FAILED, op.State()) require.Equal(t, 1, len(events)) failure := events[0].GetNexusOperationFailedEventAttributes().Failure.Cause - require.NotNil(t, failure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (NOT_FOUND): endpoint not registered", failure.Message) - require.NotNil(t, failure.Cause.GetApplicationFailureInfo()) - require.Equal(t, "endpoint not registered", failure.Cause.Message) + require.Equal(t, string(nexus.HandlerErrorTypeNotFound), failure.GetNexusHandlerFailureInfo().GetType()) + require.Equal(t, "endpoint not registered", failure.Message) }, }, { @@ -653,8 +650,10 @@ func TestProcessCancelationTask(t *testing.T) { expectedMetricOutcome: "handler-error:INTERNAL", checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) - require.NotNil(t, c.LastAttemptFailure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (INTERNAL): operation not found", c.LastAttemptFailure.Message) + require.Equal(t, string(nexus.HandlerErrorTypeInternal), c.LastAttemptFailure.GetNexusHandlerFailureInfo().GetType()) + require.Equal(t, "Internal Server Error", c.LastAttemptFailure.Message) + require.NotNil(t, c.LastAttemptFailure.Cause) + require.Equal(t, "operation not found", c.LastAttemptFailure.Cause.Message) }, }, { @@ -698,7 +697,7 @@ func TestProcessCancelationTask(t *testing.T) { checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, c.State()) require.NotNil(t, c.LastAttemptFailure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (INTERNAL): internal server error", c.LastAttemptFailure.Message) + require.Equal(t, "internal server error", c.LastAttemptFailure.Message) }, }, { @@ -737,8 +736,8 @@ func TestProcessCancelationTask(t *testing.T) { onCancelOperation: nil, // This should not be called if the endpoint is not found. checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) - require.NotNil(t, c.LastAttemptFailure.GetNexusHandlerFailureInfo()) - require.Equal(t, "handler error (NOT_FOUND): endpoint not registered", c.LastAttemptFailure.Message) + require.Equal(t, string(nexus.HandlerErrorTypeNotFound), c.LastAttemptFailure.GetNexusHandlerFailureInfo().GetType()) + require.Equal(t, "endpoint not registered", c.LastAttemptFailure.Message) }, }, } diff --git a/components/nexusoperations/frontend/handler.go b/components/nexusoperations/frontend/handler.go index 843274ccfc..6c6bfe7a92 100644 --- a/components/nexusoperations/frontend/handler.go +++ b/components/nexusoperations/frontend/handler.go @@ -201,15 +201,8 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C } switch r.State { // nolint:exhaustive case nexus.OperationStateFailed, nexus.OperationStateCanceled: - failureErr, ok := r.Error.(*nexus.FailureError) - if !ok { - // This shouldn't happen as the Nexus SDK is always expected to convert Failures from the wire to - // FailureErrors. - logger.Error("result error is not a FailureError", tag.Error(err)) - return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal server error") - } hr.Outcome = &historyservice.CompleteNexusOperationRequest_Failure{ - Failure: commonnexus.NexusFailureToProtoFailure(failureErr.Failure), + Failure: commonnexus.NexusFailureToProtoFailure(*r.Error.OriginalFailure), } case nexus.OperationStateSucceeded: var result *commonpb.Payload @@ -284,19 +277,14 @@ func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nex case nexus.OperationStateFailed, nexus.OperationStateCanceled: // For unsuccessful operations, the Nexus framework reads and closes the original request body to deserialize // the failure, so we must construct a new completion to forward. - var failureErr *nexus.FailureError - if !errors.As(r.Error, &failureErr) { - // This shouldn't happen as the Nexus SDK is always expected to convert Failures from the wire to - // FailureErrors. - h.Logger.Error("received unexpected error type when trying to forward Nexus operation completion", tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err)) - return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } c := &nexusrpc.OperationCompletionUnsuccessful{ State: r.State, OperationToken: r.OperationToken, StartTime: r.StartTime, Links: r.Links, - Failure: failureErr.Failure, + } + if r.Error != nil && r.Error.OriginalFailure != nil { + c.Failure = *r.Error.OriginalFailure } forwardReq, err = nexusrpc.NewCompletionHTTPRequest(ctx, forwardURL, c) if err != nil { diff --git a/go.mod b/go.mod index a660043654..6265c0cec1 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/lib/pq v1.10.9 github.com/maruel/panicparse/v2 v2.4.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/nexus-rpc/sdk-go v0.5.1 + github.com/nexus-rpc/sdk-go v0.5.2-0.20251217172131-63a8027ef960 github.com/olekukonko/tablewriter v0.0.5 github.com/olivere/elastic/v7 v7.0.32 github.com/pkg/errors v0.9.1 @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 + go.temporal.io/api v1.61.1-0.20260123235933-4c15176b0e79 go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 9a46dfd672..ca2455f31a 100644 --- a/go.sum +++ b/go.sum @@ -234,8 +234,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/nexus-rpc/sdk-go v0.5.1 h1:UFYYfoHlQc+Pn9gQpmn9QE7xluewAn2AO1OSkAh7YFU= -github.com/nexus-rpc/sdk-go v0.5.1/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= +github.com/nexus-rpc/sdk-go v0.5.2-0.20251217172131-63a8027ef960 h1:ljAYqlX3IFBf7zqF8JGAgn21k7PBq4qyS8d45LcLDmQ= +github.com/nexus-rpc/sdk-go v0.5.2-0.20251217172131-63a8027ef960/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -371,8 +371,12 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388 h1:Rahqpgjqalbv28RLoOtnNNZvwtnes/sQP0+cisO70Hw= -go.temporal.io/api v1.61.1-0.20260123144430-3418f5100388/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.1-0.20260123194132-ee4a47298624 h1:3dqSdkDfWugeg8QEf1WsAFhtTyNV6Fj4I6wpD51gtVc= +go.temporal.io/api v1.61.1-0.20260123194132-ee4a47298624/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.1-0.20260123211420-03a0445cb1c7 h1:v5k7tdCrSAI1zNj7jClndKc/2Sq3C/B5g5NEQmkZrWs= +go.temporal.io/api v1.61.1-0.20260123211420-03a0445cb1c7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.1-0.20260123235933-4c15176b0e79 h1:iv07QpG6uAWY67jK1WK/NSwkm2P5pyT2gnh2mqsTyCQ= +go.temporal.io/api v1.61.1-0.20260123235933-4c15176b0e79/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/matchingservice/v1/request_response.proto b/proto/internal/temporal/server/api/matchingservice/v1/request_response.proto index da9cbc2bb5..4d18819755 100644 --- a/proto/internal/temporal/server/api/matchingservice/v1/request_response.proto +++ b/proto/internal/temporal/server/api/matchingservice/v1/request_response.proto @@ -9,6 +9,7 @@ import "google/protobuf/timestamp.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/deployment/v1/message.proto"; import "temporal/api/enums/v1/task_queue.proto"; +import "temporal/api/failure/v1/message.proto"; import "temporal/api/history/v1/message.proto"; import "temporal/api/taskqueue/v1/message.proto"; import "temporal/api/query/v1/message.proto"; @@ -488,11 +489,13 @@ message DispatchNexusTaskResponse { message Timeout {} oneof outcome { - // Set if the worker's handler failed the nexus task. - temporal.api.nexus.v1.HandlerError handler_error = 1; + // Deprecated. Use failure field instead. + temporal.api.nexus.v1.HandlerError handler_error = 1 [deprecated = true]; // Set if the worker's handler responded successfully to the nexus task. temporal.api.nexus.v1.Response response = 2; Timeout request_timeout = 3; + // Set if the worker's handler failed the nexus task. Must contain a NexusHandlerFailureInfo object. + temporal.api.failure.v1.Failure failure = 4; } } diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index 55bf5f28cb..c8cfeee1c4 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -414,6 +414,9 @@ func (h *nexusHandler) StartOperation( Variant: &nexuspb.Request_StartOperation{ StartOperation: &startOperationRequest, }, + Capabilities: &nexuspb.Request_Capabilities{ + TemporalFailureResponses: true, + }, }) if err := oc.interceptRequest(ctx, request, options.Header); err != nil { @@ -444,19 +447,32 @@ func (h *nexusHandler) StartOperation( } // Convert to standard Nexus SDK response. switch t := response.GetOutcome().(type) { + case *matchingservice.DispatchNexusTaskResponse_Failure: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.Failure.GetNexusHandlerFailureInfo().GetType())) + nf, err := commonnexus.TemporalFailureToNexusFailure(t.Failure) + if err != nil { + oc.logger.Error("error converting Temporal failure to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + he, err := nexus.DefaultFailureConverter().FailureToError(nf) + if err != nil { + oc.logger.Error("error converting Nexus failure to Nexus HandlerError", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + // Failure conversions are our fault so only set this after converting the Temporal failure to a HandlerError. + oc.setFailureSource(commonnexus.FailureSourceWorker) + return nil, he + case *matchingservice.DispatchNexusTaskResponse_HandlerError: + // Deprecated case. Replaced with DispatchNexusTaskResponse_Failure oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.HandlerError.GetErrorType())) - - oc.nexusContext.setFailureSource(commonnexus.FailureSourceWorker) - - err := h.convertOutcomeToNexusHandlerError(t) + oc.setFailureSource(commonnexus.FailureSourceWorker) + err := convertOutcomeToNexusHandlerError(t) return nil, err case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_timeout")) - oc.setFailureSource(commonnexus.FailureSourceWorker) - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") case *matchingservice.DispatchNexusTaskResponse_Response: @@ -471,7 +487,6 @@ func (h *nexusHandler) StartOperation( case *nexuspb.StartOperationResponse_AsyncSuccess: oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("async_success")) - token := t.AsyncSuccess.GetOperationToken() if token == "" { token = t.AsyncSuccess.GetOperationId() @@ -484,9 +499,7 @@ func (h *nexusHandler) StartOperation( case *nexuspb.StartOperationResponse_OperationError: oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("operation_error")) - - oc.nexusContext.setFailureSource(commonnexus.FailureSourceWorker) - + oc.setFailureSource(commonnexus.FailureSourceWorker) err := &nexus.OperationError{ State: nexus.OperationState(t.OperationError.GetOperationState()), Cause: &nexus.FailureError{ @@ -494,12 +507,26 @@ func (h *nexusHandler) StartOperation( }, } return nil, err + + case *nexuspb.StartOperationResponse_Failure: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("failure")) + oc.setFailureSource(commonnexus.FailureSourceWorker) + nf, err := commonnexus.TemporalFailureToNexusFailure(t.Failure) + if err != nil { + oc.logger.Error("error converting Temporal failure to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + oe, err := nexus.DefaultFailureConverter().FailureToError(nf) + if err != nil { + oc.logger.Error("error converting Nexus failure to Nexus OperationError", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + return nil, oe } } // This is the worker's fault. oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:EMPTY_OUTCOME")) - - oc.nexusContext.setFailureSource(commonnexus.FailureSourceWorker) + oc.setFailureSource(commonnexus.FailureSourceWorker) return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "empty outcome") } @@ -590,6 +617,9 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation, OperationId: token, }, }, + Capabilities: &nexuspb.Request_Capabilities{ + TemporalFailureResponses: true, + }, }) if err := oc.interceptRequest(ctx, request, options.Header); err != nil { var notActiveErr *serviceerror.NamespaceNotActive @@ -609,19 +639,32 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation, } // Convert to standard Nexus SDK response. switch t := response.GetOutcome().(type) { + case *matchingservice.DispatchNexusTaskResponse_Failure: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.Failure.GetNexusHandlerFailureInfo().GetType())) + oc.setFailureSource(commonnexus.FailureSourceWorker) + nf, err := commonnexus.TemporalFailureToNexusFailure(t.Failure) + if err != nil { + oc.logger.Error("error converting Temporal failure to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + he, err := nexus.DefaultFailureConverter().FailureToError(nf) + if err != nil { + oc.logger.Error("error converting Nexus failure to Nexus HandlerError", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) + return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + // Failure conversions are our fault so only set this after converting the Temporal failure to a HandlerError. + return he + case *matchingservice.DispatchNexusTaskResponse_HandlerError: + // Deprecated case. Replaced with DispatchNexusTaskResponse_Failure oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.HandlerError.GetErrorType())) - oc.nexusContext.setFailureSource(commonnexus.FailureSourceWorker) - - err := h.convertOutcomeToNexusHandlerError(t) + err := convertOutcomeToNexusHandlerError(t) return err case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_timeout")) - oc.setFailureSource(commonnexus.FailureSourceWorker) - return nexus.HandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") case *matchingservice.DispatchNexusTaskResponse_Response: @@ -630,7 +673,6 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation, } // This is the worker's fault. oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:EMPTY_OUTCOME")) - oc.nexusContext.setFailureSource(commonnexus.FailureSourceWorker) return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "empty outcome") @@ -729,7 +771,7 @@ func (h *nexusHandler) nexusClientForActiveCluster(oc *operationContext, service }) } -func (h *nexusHandler) convertOutcomeToNexusHandlerError(resp *matchingservice.DispatchNexusTaskResponse_HandlerError) *nexus.HandlerError { +func convertOutcomeToNexusHandlerError(resp *matchingservice.DispatchNexusTaskResponse_HandlerError) *nexus.HandlerError { var retryBehavior nexus.HandlerErrorRetryBehavior // nolint:exhaustive // unspecified is the default switch resp.HandlerError.RetryBehavior { @@ -738,28 +780,13 @@ func (h *nexusHandler) convertOutcomeToNexusHandlerError(resp *matchingservice.D case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: retryBehavior = nexus.HandlerErrorRetryBehaviorNonRetryable } - handlerError := &nexus.HandlerError{ - Type: nexus.HandlerErrorType(resp.HandlerError.GetErrorType()), - Cause: &nexus.FailureError{ - Failure: commonnexus.ProtoFailureToNexusFailure(resp.HandlerError.GetFailure()), - }, - RetryBehavior: retryBehavior, - } - - switch handlerError.Type { - case nexus.HandlerErrorTypeUpstreamTimeout, - nexus.HandlerErrorTypeUnauthenticated, - nexus.HandlerErrorTypeUnauthorized, - nexus.HandlerErrorTypeBadRequest, - nexus.HandlerErrorTypeResourceExhausted, - nexus.HandlerErrorTypeNotFound, - nexus.HandlerErrorTypeNotImplemented, - nexus.HandlerErrorTypeUnavailable, - nexus.HandlerErrorTypeInternal: - return handlerError - default: - h.logger.Warn("received unknown or unset Nexus handler error type", tag.Value(handlerError.Type)) - return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + // nolint:staticcheck // Deprecated function still in use for backward compatibility. + originalFailure := commonnexus.ProtoFailureToNexusFailure(resp.HandlerError.GetFailure()) + return &nexus.HandlerError{ + // nolint:staticcheck // Deprecated function still in use for backward compatibility. + Type: nexus.HandlerErrorType(resp.HandlerError.GetErrorType()), + RetryBehavior: retryBehavior, + OriginalFailure: &originalFailure, } } diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 9f51ad6bb1..9dc5410337 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -5425,8 +5425,13 @@ func (wh *WorkflowHandler) RespondNexusTaskCompleted(ctx context.Context, reques // doesn't go into workflow history, and the Nexus request caller is unknown, there doesn't seem like there's a // good reason to fail at this point. - if details := request.GetResponse().GetStartOperation().GetOperationError().GetFailure().GetDetails(); details != nil && !json.Valid(details) { - return nil, serviceerror.NewInvalidArgument("failure details must be JSON serializable") + if opErr := request.GetResponse().GetStartOperation().GetOperationError(); opErr != nil { + if details := opErr.GetFailure().GetDetails(); details != nil && !json.Valid(details) { + return nil, serviceerror.NewInvalidArgument("failure details must be JSON serializable") + } + } + if f := request.GetResponse().GetStartOperation().GetFailure(); f != nil && f.GetNexusSdkOperationFailureInfo() == nil { + return nil, serviceerror.NewInvalidArgument("request StartOperation Failure must contain failure with NexusSdkOperationFailureInfo") } matchingRequest := &matchingservice.RespondNexusTaskCompletedRequest{ @@ -5466,8 +5471,16 @@ func (wh *WorkflowHandler) RespondNexusTaskFailed(ctx context.Context, request * } namespaceId := namespace.ID(tt.GetNamespaceId()) - if details := request.GetError().GetFailure().GetDetails(); details != nil && !json.Valid(details) { - return nil, serviceerror.NewInvalidArgument("failure details must be JSON serializable") + if request.Error == nil && request.Failure == nil { // nolint:staticcheck // checking deprecated field for backwards compatibility + return nil, serviceerror.NewInvalidArgument("request must contain error or failure") + } + if request.GetError() != nil { // nolint:staticcheck // checking deprecated field for backwards compatibility + if details := request.GetError().GetFailure().GetDetails(); details != nil && !json.Valid(details) { // nolint:staticcheck // checking deprecated field for backwards compatibility + return nil, serviceerror.NewInvalidArgument("failure details must be JSON serializable") + } + } + if request.GetFailure() != nil && request.GetFailure().GetNexusHandlerFailureInfo() == nil { + return nil, serviceerror.NewInvalidArgument("request Failure must contain error or failure with NexusHandlerFailureInfo") } // NOTE: Not checking blob size limit here as we already enforce the 4 MB gRPC request limit and since this diff --git a/service/history/handler.go b/service/history/handler.go index 6982f6591d..e070b92d7d 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -2443,11 +2443,25 @@ func (h *Handler) CompleteNexusOperation(ctx context.Context, request *historyse } var opErr *nexus.OperationError if request.State != string(nexus.OperationStateSucceeded) { - opErr = &nexus.OperationError{ - State: nexus.OperationState(request.GetState()), - Cause: &nexus.FailureError{ - Failure: commonnexus.ProtoFailureToNexusFailure(request.GetFailure()), - }, + failure := commonnexus.ProtoFailureToNexusFailure(request.GetFailure()) + recvdErr, err := nexus.DefaultFailureConverter().FailureToError(failure) + if err != nil { + return nil, serviceerror.NewInvalidArgument("unable to convert failure to error") + } + // Backward compatibility: if the received error is not of type OperationError, wrap the error in OperationError. + var ok bool + if opErr, ok = recvdErr.(*nexus.OperationError); !ok { + opErr = &nexus.OperationError{ + State: nexus.OperationState(request.GetState()), + // Setting a message here will bypass the Nexus SDK's failure converter backward compatibility logic. + Message: "nexus operation completed unsuccessfully", + Cause: recvdErr, + } + origFailure, err := nexus.DefaultFailureConverter().ErrorToFailure(opErr) + if err != nil { + return nil, serviceerror.NewInvalidArgument("unable to convert operation error to failure") + } + opErr.OriginalFailure = &origFailure } } err = nexusoperations.CompletionHandler( diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 7896a61a06..83c356657e 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -777,19 +777,24 @@ func (ms *MutableStateImpl) GetNexusCompletion( } return completion, nil case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED: - f, err := commonnexus.APIFailureToNexusFailure(ce.GetWorkflowExecutionFailedEventAttributes().GetFailure()) + f, err := commonnexus.TemporalFailureToNexusFailure(ce.GetWorkflowExecutionFailedEventAttributes().GetFailure()) if err != nil { return nil, err } return nexusrpc.NewOperationCompletionUnsuccessful( - &nexus.OperationError{State: nexus.OperationStateFailed, Cause: &nexus.FailureError{Failure: f}}, + &nexus.OperationError{ + State: nexus.OperationStateFailed, + Cause: &nexus.FailureError{Failure: f}, + // Store the original failure to bypass the Nexus failure converter. + OriginalFailure: &f, + }, nexusrpc.OperationCompletionUnsuccessfulOptions{ StartTime: ms.executionState.GetStartTime().AsTime(), CloseTime: ce.GetEventTime().AsTime(), Links: []nexus.Link{startLink}, }) case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: - f, err := commonnexus.APIFailureToNexusFailure(&failurepb.Failure{ + f, err := commonnexus.TemporalFailureToNexusFailure(&failurepb.Failure{ Message: "operation canceled", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ CanceledFailureInfo: &failurepb.CanceledFailureInfo{ @@ -804,6 +809,8 @@ func (ms *MutableStateImpl) GetNexusCompletion( &nexus.OperationError{ State: nexus.OperationStateCanceled, Cause: &nexus.FailureError{Failure: f}, + // Store the original failure to bypass the Nexus failure converter. + OriginalFailure: &f, }, nexusrpc.OperationCompletionUnsuccessfulOptions{ StartTime: ms.executionState.GetStartTime().AsTime(), @@ -811,7 +818,7 @@ func (ms *MutableStateImpl) GetNexusCompletion( Links: []nexus.Link{startLink}, }) case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED: - f, err := commonnexus.APIFailureToNexusFailure(&failurepb.Failure{ + f, err := commonnexus.TemporalFailureToNexusFailure(&failurepb.Failure{ Message: "operation terminated", FailureInfo: &failurepb.Failure_TerminatedFailureInfo{ TerminatedFailureInfo: &failurepb.TerminatedFailureInfo{}, @@ -821,14 +828,20 @@ func (ms *MutableStateImpl) GetNexusCompletion( return nil, err } return nexusrpc.NewOperationCompletionUnsuccessful( - &nexus.OperationError{State: nexus.OperationStateFailed, Cause: &nexus.FailureError{Failure: f}}, + // NOTE: Not setting a message for compatibility with older servers than don't support both cause and message. + &nexus.OperationError{ + State: nexus.OperationStateFailed, + Cause: &nexus.FailureError{Failure: f}, + // Store the original failure to bypass the Nexus failure converter. + OriginalFailure: &f, + }, nexusrpc.OperationCompletionUnsuccessfulOptions{ StartTime: ms.executionState.GetStartTime().AsTime(), CloseTime: ce.GetEventTime().AsTime(), Links: []nexus.Link{startLink}, }) case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: - f, err := commonnexus.APIFailureToNexusFailure(&failurepb.Failure{ + f, err := commonnexus.TemporalFailureToNexusFailure(&failurepb.Failure{ Message: "operation exceeded internal timeout", FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -844,6 +857,8 @@ func (ms *MutableStateImpl) GetNexusCompletion( &nexus.OperationError{ State: nexus.OperationStateFailed, Cause: &nexus.FailureError{Failure: f}, + // Store the original failure to bypass the Nexus failure converter. + OriginalFailure: &f, }, nexusrpc.OperationCompletionUnsuccessfulOptions{ StartTime: ms.executionState.GetStartTime().AsTime(), diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 445fb5e8f9..17aea97f7c 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -2405,8 +2405,14 @@ func (e *matchingEngineImpl) DispatchNexusTask(ctx context.Context, request *mat return nil, result.internalError } if result.failedWorkerResponse != nil { - return &matchingservice.DispatchNexusTaskResponse{Outcome: &matchingservice.DispatchNexusTaskResponse_HandlerError{ - HandlerError: result.failedWorkerResponse.GetRequest().GetError(), + if result.failedWorkerResponse.GetRequest().GetError() != nil { // nolint:staticcheck // checking deprecated field for backwards compatibility + // Deprecated case. Kept for backwards-compatibility with older SDKs that are sending errors instead of failures. + return &matchingservice.DispatchNexusTaskResponse{Outcome: &matchingservice.DispatchNexusTaskResponse_HandlerError{ + HandlerError: result.failedWorkerResponse.GetRequest().GetError(), // nolint:staticcheck // checking deprecated field for backwards compatibility + }}, nil + } + return &matchingservice.DispatchNexusTaskResponse{Outcome: &matchingservice.DispatchNexusTaskResponse_Failure{ + Failure: result.failedWorkerResponse.GetRequest().GetFailure(), }}, nil } diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index 5bca780c1e..8055b925be 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -203,6 +203,8 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes() { require.Equal(t, nexus.HandlerErrorTypeInternal, handlerErr.Type) require.Equal(t, nexus.HandlerErrorRetryBehaviorUnspecified, handlerErr.RetryBehavior) require.Equal(t, "worker", headers.Get("Temporal-Nexus-Failure-Source")) + require.Equal(t, "Internal Server Error", handlerErr.Message) + require.Error(t, handlerErr.Cause) require.Equal(t, "deliberate internal failure", handlerErr.Cause.Error()) }, }, @@ -223,6 +225,8 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes() { require.Equal(t, nexus.HandlerErrorTypeInternal, handlerErr.Type) require.Equal(t, nexus.HandlerErrorRetryBehaviorNonRetryable, handlerErr.RetryBehavior) require.Equal(t, "worker", headers.Get("Temporal-Nexus-Failure-Source")) + require.Equal(t, "Internal Server Error", handlerErr.Message) + require.Error(t, handlerErr.Cause) require.Equal(t, "deliberate internal failure", handlerErr.Cause.Error()) }, }, @@ -248,7 +252,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes() { var handlerErr *nexus.HandlerError require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUpstreamTimeout, handlerErr.Type) - require.Equal(t, "upstream timeout", handlerErr.Cause.Error()) + require.Equal(t, "upstream timeout", handlerErr.Message) }, }, } @@ -343,7 +347,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_Na var handlerError *nexus.HandlerError s.ErrorAs(err, &handlerError) s.Equal(nexus.HandlerErrorTypeNotFound, handlerError.Type) - s.Equal(fmt.Sprintf("namespace not found: %q", namespace), handlerError.Cause.Error()) + s.Equal(fmt.Sprintf("namespace not found: %q", namespace), handlerError.Message) snap := capture.Snapshot() @@ -406,7 +410,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Forbidden() { }, checkFailure: func(t *testing.T, handlerErr *nexus.HandlerError) { require.Equal(t, nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - require.Equal(t, "permission denied: unauthorized in test", handlerErr.Cause.Error()) + require.Equal(t, "permission denied: unauthorized in test", handlerErr.Message) }, expectedOutcomeMetric: "unauthorized", exposeAuthorizerErrors: false, @@ -427,7 +431,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Forbidden() { }, checkFailure: func(t *testing.T, handlerErr *nexus.HandlerError) { require.Equal(t, nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - require.Equal(t, "permission denied", handlerErr.Cause.Error()) + require.Equal(t, "permission denied", handlerErr.Message) }, expectedOutcomeMetric: "unauthorized", exposeAuthorizerErrors: false, @@ -448,7 +452,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Forbidden() { }, checkFailure: func(t *testing.T, handlerErr *nexus.HandlerError) { require.Equal(t, nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - require.Equal(t, "permission denied", handlerErr.Cause.Error()) + require.Equal(t, "permission denied", handlerErr.Message) }, expectedOutcomeMetric: "unauthorized", exposeAuthorizerErrors: false, @@ -469,7 +473,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Forbidden() { }, checkFailure: func(t *testing.T, handlerErr *nexus.HandlerError) { require.Equal(t, nexus.HandlerErrorTypeUnavailable, handlerErr.Type) - require.Equal(t, "exposed error", handlerErr.Cause.Error()) + require.Equal(t, "exposed error", handlerErr.Message) }, expectedOutcomeMetric: "internal_auth_error", exposeAuthorizerErrors: true, @@ -536,7 +540,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims() { var handlerErr *nexus.HandlerError require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - require.Equal(t, "permission denied", handlerErr.Cause.Error()) + require.Equal(t, "permission denied", handlerErr.Message) require.Equal(t, 0, len(snap["nexus_request_preprocess_errors"])) }, }, @@ -549,7 +553,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims() { var handlerErr *nexus.HandlerError require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUnauthenticated, handlerErr.Type) - require.Equal(t, "unauthorized", handlerErr.Cause.Error()) + require.Equal(t, "Unauthorized", handlerErr.Message) require.Equal(t, 1, len(snap["nexus_request_preprocess_errors"])) }, }, @@ -662,7 +666,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_PayloadSizeLimit() { var handlerErr *nexus.HandlerError require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - require.Equal(t, "input exceeds size limit", handlerErr.Cause.Error()) + require.Equal(t, "input exceeds size limit", handlerErr.Message) } s.T().Run("ByNamespaceAndTaskQueue", func(t *testing.T) { @@ -722,6 +726,8 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes() { require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeInternal, handlerErr.Type) require.Equal(t, "worker", headers.Get("Temporal-Nexus-Failure-Source")) + require.Equal(t, "Internal Server Error", handlerErr.Message) + require.Error(t, handlerErr.Cause) require.Equal(t, "deliberate internal failure", handlerErr.Cause.Error()) }, }, @@ -741,7 +747,7 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes() { var handlerErr *nexus.HandlerError require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUpstreamTimeout, handlerErr.Type) - require.Equal(t, "upstream timeout", handlerErr.Cause.Error()) + require.Equal(t, "upstream timeout", handlerErr.Message) }, }, } diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 7f1d8980ec..519c206ed4 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -3,7 +3,6 @@ package tests import ( "bytes" "context" - "encoding/json" "errors" "fmt" "io" @@ -1211,9 +1210,11 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncFailure() { var result string err = run.Get(ctx, &result) var wee *temporal.WorkflowExecutionError - s.ErrorAs(err, &wee) - s.True(strings.HasPrefix(wee.Unwrap().Error(), "nexus operation completed unsuccessfully")) + + var noe *temporal.NexusOperationError + s.ErrorAs(wee, &noe) + s.Contains(noe.Error(), "test operation failed") } func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors() { @@ -2133,9 +2134,7 @@ func (s *NexusWorkflowTestSuite) TestNexusSyncOperationErrorRehydration() { checkWorkflowError: func(t *testing.T, wfErr error) { var opErr *temporal.NexusOperationError require.ErrorAs(t, wfErr, &opErr) - var appErr *temporal.ApplicationError - require.ErrorAs(t, opErr, &appErr) - require.Equal(t, "some error", appErr.Message()) + require.Equal(t, "some error", opErr.Message) }, }, { @@ -2503,15 +2502,18 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationSyncNexusFailure() { var handlerErr *nexus.HandlerError s.ErrorAs(wfErr, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) + // Old SDK path var appErr *temporal.ApplicationError s.ErrorAs(handlerErr.Cause, &appErr) s.Equal(appErr.Message(), "fail me") - var failure nexus.Failure - s.NoError(appErr.Details(&failure)) - s.Equal(map[string]string{"key": "val"}, failure.Metadata) - var details string - s.NoError(json.Unmarshal(failure.Details, &details)) - s.Equal("details", details) + // NOTE: We broke compatibility here but the likelyhood of anyone using FailureErrors directly is practically zero. + // TODO(bergundy): test when the new SDK supports deserializing Nexus SDK failures + // var failure nexus.Failure + // s.NoError(appErr.Details(&failure)) + // s.Equal(map[string]string{"key": "val"}, failure.Metadata) + // var details string + // s.NoError(json.Unmarshal(failure.Details, &details)) + // s.Equal("details", details) snap := capture.Snapshot() s.Len(snap["nexus_outbound_requests"], 1) diff --git a/tests/xdc/nexus_request_forwarding_test.go b/tests/xdc/nexus_request_forwarding_test.go index de8670a14c..31da7c9a59 100644 --- a/tests/xdc/nexus_request_forwarding_test.go +++ b/tests/xdc/nexus_request_forwarding_test.go @@ -191,6 +191,8 @@ func (s *NexusRequestForwardingSuite) TestStartOperationForwardedFromStandbyToAc var handlerErr *nexus.HandlerError require.ErrorAs(t, retErr, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeInternal, handlerErr.Type) + require.Equal(t, "Internal Server Error", handlerErr.Message) + require.Error(t, handlerErr.Cause) require.Equal(t, "deliberate internal failure", handlerErr.Cause.Error()) requireExpectedMetricsCaptured(t, activeSnap, ns, "StartNexusOperation", "handler_error:INTERNAL") requireExpectedMetricsCaptured(t, passiveSnap, ns, "StartNexusOperation", "forwarded_request_error") @@ -211,7 +213,7 @@ func (s *NexusRequestForwardingSuite) TestStartOperationForwardedFromStandbyToAc var handlerErr *nexus.HandlerError require.ErrorAs(t, retErr, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUnavailable, handlerErr.Type) - require.Equal(t, "cluster inactive", handlerErr.Cause.Error()) + require.Equal(t, "cluster inactive", handlerErr.Message) requireExpectedMetricsCaptured(t, passiveSnap, ns, "StartNexusOperation", "namespace_inactive_forwarding_disabled") }, }, @@ -315,6 +317,8 @@ func (s *NexusRequestForwardingSuite) TestCancelOperationForwardedFromStandbyToA var handlerErr *nexus.HandlerError require.ErrorAs(t, retErr, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeInternal, handlerErr.Type) + require.Equal(t, "Internal Server Error", handlerErr.Message) + require.Error(t, handlerErr.Cause) require.Equal(t, "deliberate internal failure", handlerErr.Cause.Error()) requireExpectedMetricsCaptured(t, activeSnap, ns, "CancelNexusOperation", "handler_error:INTERNAL") requireExpectedMetricsCaptured(t, passiveSnap, ns, "CancelNexusOperation", "forwarded_request_error") @@ -335,7 +339,7 @@ func (s *NexusRequestForwardingSuite) TestCancelOperationForwardedFromStandbyToA var handlerErr *nexus.HandlerError require.ErrorAs(t, retErr, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeUnavailable, handlerErr.Type) - require.Equal(t, "cluster inactive", handlerErr.Cause.Error()) + require.Equal(t, "cluster inactive", handlerErr.Message) requireExpectedMetricsCaptured(t, passiveSnap, ns, "CancelNexusOperation", "namespace_inactive_forwarding_disabled") }, },