Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 94 additions & 69 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c chasmInvocation) getHistoryRequest(
Completion: completion,
}
case *nexusrpc.OperationCompletionUnsuccessful:
apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true)
apiFailure, err := commonnexus.NexusFailureToTemporalFailure(op.Failure)
if err != nil {
return nil, fmt.Errorf("failed to convert failure type: %v", err)
}
Expand Down
315 changes: 223 additions & 92 deletions common/nexus/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package nexus

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -52,135 +55,263 @@ var failureTypeString = string((&failurepb.Failure{}).ProtoReflect().Descriptor(

// ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.
func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure {
return nexus.Failure{
Message: failure.GetMessage(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
nf := nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
}
if failure.GetCause() != nil {
cause := ProtoFailureToNexusFailure(failure.GetCause())
nf.Cause = &cause
}
return nf
}

// NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure.
// Always returns a non-nil value.
func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure {
return &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
pf := &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
StackTrace: failure.StackTrace,
}
if failure.Cause != nil {
pf.Cause = NexusFailureToProtoFailure(*failure.Cause)
}
return pf
}

type serializedOperationError struct {
State string `json:"state,omitempty"`
// Bytes as base64 encoded string.
EncodedAttributes string `json:"encodedAttributes,omitempty"`
}

// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message.
type serializedHandlerError struct {
Type string `json:"type,omitempty"`
RetryableOverride *bool `json:"retryableOverride,omitempty"`
// Bytes as base64 encoded string.
EncodedAttributes string `json:"encodedAttributes,omitempty"`
}

// TemporalFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message or the standard Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized
// failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.
func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
// Unset message so it's not serialized in the details.
func TemporalFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
var causep *nexus.Failure
if failure.GetCause() != nil {
var cause nexus.Failure
var err error
cause, err = TemporalFailureToNexusFailure(failure.GetCause())
if err != nil {
return nexus.Failure{}, err
}
causep = &cause
}

switch info := failure.GetFailureInfo().(type) {
case *failurepb.Failure_NexusSdkOperationFailureInfo:
var encodedAttributes string
if failure.EncodedAttributes != nil {
b, err := protojson.Marshal(failure.EncodedAttributes)
if err != nil {
return nexus.Failure{}, fmt.Errorf("failed to deserialize OperationError attributes: %w", err)
}
encodedAttributes = base64.StdEncoding.EncodeToString(b)
}
operationError := serializedOperationError{
State: info.NexusSdkOperationFailureInfo.GetState(),
EncodedAttributes: encodedAttributes,
}

details, err := json.Marshal(operationError)
if err != nil {
return nexus.Failure{}, err
}
return nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": "nexus.OperationError",
},
Details: details,
Cause: causep,
}, nil
case *failurepb.Failure_NexusHandlerFailureInfo:
var encodedAttributes string
if failure.EncodedAttributes != nil {
b, err := protojson.Marshal(failure.EncodedAttributes)
if err != nil {
return nexus.Failure{}, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
encodedAttributes = base64.StdEncoding.EncodeToString(b)
}
var retryableOverride *bool
switch info.NexusHandlerFailureInfo.GetRetryBehavior() {
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE:
val := true
retryableOverride = &val
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE:
val := false
retryableOverride = &val
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED:
// noop
default:
// noop
}

handlerError := serializedHandlerError{
Type: info.NexusHandlerFailureInfo.GetType(),
RetryableOverride: retryableOverride,
EncodedAttributes: encodedAttributes,
}

details, err := json.Marshal(handlerError)
if err != nil {
return nexus.Failure{}, err
}
return nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": "nexus.HandlerError",
},
Details: details,
Cause: causep,
}, nil
case *failurepb.Failure_NexusSdkFailureErrorInfo:
return nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: info.NexusSdkFailureErrorInfo.GetMetadata(),
Details: info.NexusSdkFailureErrorInfo.GetDetails(),
Cause: causep,
}, nil
}
// Unset message and stack trace so it's not serialized in the details.
var message string
message, failure.Message = failure.Message, ""
var stackTrace string
stackTrace, failure.StackTrace = failure.StackTrace, ""

data, err := protojson.Marshal(failure)
failure.Message = message

failure.StackTrace = stackTrace
if err != nil {
return nexus.Failure{}, err
}

return nexus.Failure{
Message: failure.GetMessage(),
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": failureTypeString,
},
Details: data,
Cause: causep,
}, nil
}

// NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure.
// NexusFailureToTemporalFailure converts a Nexus Failure to an API proto Failure.
// If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is
// reconstructed using protojson.Unmarshal on the failure details field.
func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{}

if failure.Metadata != nil && failure.Metadata["type"] == failureTypeString {
if err := protojson.Unmarshal(failure.Details, apiFailure); err != nil {
return nil, err
}
} else {
payloads, err := nexusFailureMetadataToPayloads(failure)
if err != nil {
return nil, err
}
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
// Make up a type here, it's not part of the Nexus Failure spec.
Type: "NexusFailure",
Details: payloads,
NonRetryable: !retryable,
},
}
}
// Ensure this always gets written.
apiFailure.Message = failure.Message
return apiFailure, nil
}

func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error) {
var nexusFailure nexus.Failure
failureErr, ok := opErr.Cause.(*nexus.FailureError)
if ok {
nexusFailure = failureErr.Failure
} else if opErr.Cause != nil {
nexusFailure = nexus.Failure{Message: opErr.Cause.Error()}
// reconstructed using protojson.Unmarshal on the failure details field. Otherwise, the failure is reconstructed
// based on the known Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// nolint:revive // cognitive-complexity is high but justified to keep each case together
func NexusFailureToTemporalFailure(f nexus.Failure) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{
Message: f.Message,
StackTrace: f.StackTrace,
}

// Canceled must be translated into a CanceledFailure to match the SDK expectation.
if opErr.State == nexus.OperationStateCanceled {
if nexusFailure.Metadata != nil && nexusFailure.Metadata["type"] == failureTypeString {
temporalFailure, err := NexusFailureToAPIFailure(nexusFailure, false)
if err != nil {
if f.Metadata != nil {
switch f.Metadata["type"] {
case failureTypeString:
if err := protojson.Unmarshal(f.Details, apiFailure); err != nil {
return nil, err
}
if temporalFailure.GetCanceledFailureInfo() != nil {
// We already have a CanceledFailure, use it.
return temporalFailure, nil
// Restore these fields as they are not included in the marshalled failure.
apiFailure.Message = f.Message
apiFailure.StackTrace = f.StackTrace
case "nexus.OperationError":
var se serializedOperationError
err := json.Unmarshal(f.Details, &se)
if err != nil {
return nil, fmt.Errorf("failed to deserialize OperationError: %w", err)
}
// Fallback to encoding the Nexus failure into a Temporal canceled failure, we expect operations that end up
// as canceled to have a CanceledFailureInfo object.
}
payloads, err := nexusFailureMetadataToPayloads(nexusFailure)
if err != nil {
return nil, err
}
return &failurepb.Failure{
Message: nexusFailure.Message,
FailureInfo: &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{
Details: payloads,
apiFailure.FailureInfo = &failurepb.Failure_NexusSdkOperationFailureInfo{
NexusSdkOperationFailureInfo: &failurepb.NexusSDKOperationFailureInfo{
State: se.State,
},
}
if len(se.EncodedAttributes) > 0 {
decoded, err := base64.StdEncoding.DecodeString(se.EncodedAttributes)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 OperationError attributes: %w", err)
}
apiFailure.EncodedAttributes = &commonpb.Payload{}
if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil {
return nil, fmt.Errorf("failed to deserialize OperationError attributes: %w", err)
}
}
case "nexus.HandlerError":
var se serializedHandlerError
err := json.Unmarshal(f.Details, &se)
if err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError: %w", err)
}
var retryBehavior enumspb.NexusHandlerErrorRetryBehavior
if se.RetryableOverride == nil {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED
} else if *se.RetryableOverride {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE
} else {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE
}
apiFailure.FailureInfo = &failurepb.Failure_NexusHandlerFailureInfo{
NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{
Type: se.Type,
RetryBehavior: retryBehavior,
},
}
if len(se.EncodedAttributes) > 0 {
decoded, err := base64.StdEncoding.DecodeString(se.EncodedAttributes)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 HandlerError attributes: %w", err)
}
apiFailure.EncodedAttributes = &commonpb.Payload{}
if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
}
default:
apiFailure.FailureInfo = &failurepb.Failure_NexusSdkFailureErrorInfo{
NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{
Metadata: f.Metadata,
Details: f.Details,
},
}
}
} else if len(f.Details) > 0 {
apiFailure.FailureInfo = &failurepb.Failure_NexusSdkFailureErrorInfo{
NexusSdkFailureErrorInfo: &failurepb.NexusSDKFailureErrorFailureInfo{
Details: f.Details,
},
}, nil
}
}

return NexusFailureToAPIFailure(nexusFailure, false)
}

func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads, error) {
if len(failure.Metadata) == 0 && len(failure.Details) == 0 {
return nil, nil
}
// Delete before serializing.
failure.Message = ""
data, err := json.Marshal(failure)
if err != nil {
return nil, err
if f.Cause != nil {
var err error
apiFailure.Cause, err = NexusFailureToTemporalFailure(*f.Cause)
if err != nil {
return nil, err
}
}
return &commonpb.Payloads{
Payloads: []*commonpb.Payload{
{
Metadata: map[string][]byte{
"encoding": []byte("json/plain"),
},
Data: data,
},
},
}, err
return apiFailure, nil
}

// ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible.
Expand Down
Loading
Loading