diff --git a/README.md b/README.md index b30743697..d3808da94 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,14 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**AWS Encryption SDK**](/core/src/main/java/io/temporal/samples/keymanagementencryption/awsencryptionsdk): Demonstrates how to use the AWS Encryption SDK to encrypt and decrypt payloads with AWS KMS. +#### Nexus Samples + +- [**Getting Started**](/core/src/main/java/io/temporal/samples/nexus): Demonstrates how to get started with Temporal and Nexus. + +- [**Cancellation**](/core/src/main/java/io/temporal/samples/nexuscancellation): Demonstrates how to cancel an async Nexus operation. + +- [**Context/Header Propagation**](/core/src/main/java/io/temporal/samples/nexuscontextpropagation): Demonstrates how to propagate context through Nexus operation headers. + ### Running SpringBoot Samples diff --git a/core/src/main/java/io/temporal/samples/nexus/options/ClientOptions.java b/core/src/main/java/io/temporal/samples/nexus/options/ClientOptions.java index 7ba699309..57465d8a5 100644 --- a/core/src/main/java/io/temporal/samples/nexus/options/ClientOptions.java +++ b/core/src/main/java/io/temporal/samples/nexus/options/ClientOptions.java @@ -34,7 +34,13 @@ import org.apache.commons.cli.*; public class ClientOptions { + public static WorkflowClient getWorkflowClient(String[] args) { + return getWorkflowClient(args, WorkflowClientOptions.newBuilder()); + } + + public static WorkflowClient getWorkflowClient( + String[] args, WorkflowClientOptions.Builder clientOptions) { Options options = new Options(); Option targetHostOption = new Option("target-host", true, "Host:port for the Temporal service"); targetHostOption.setRequired(false); @@ -149,7 +155,6 @@ public static WorkflowClient getWorkflowClient(String[] args) { WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(serviceStubOptionsBuilder.build()); - return WorkflowClient.newInstance( - service, WorkflowClientOptions.newBuilder().setNamespace(namespace).build()); + return WorkflowClient.newInstance(service, clientOptions.setNamespace(namespace).build()); } } diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/README.MD b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/README.MD new file mode 100644 index 000000000..1f1800a3a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/README.MD @@ -0,0 +1,45 @@ +# Nexus Context Propagation + +This sample shows how to propagate MDC context values from Workflows to Nexus operations. +Nexus does not support `ContextPropagator` since the header format is not compatible. Users should look at `NexusMDCContextInterceptor` for propagating MDC context values. + +From more details on Nexus and how to setup to run this samples please see the [Nexus Sample](../nexus/README.MD). + +In separate terminal windows: + +### Nexus handler worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.handler.HandlerWorker \ + --args="-target-host localhost:7233 -namespace my-target-namespace" +``` + +### Nexus caller worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.caller.CallerWorker \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Start caller workflow + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscontextpropagation.caller.CallerStarter \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Output + +which should result in this on the caller side: +``` +INFO i.t.s.n.caller.CallerStarter - Started EchoCallerWorkflow workflowId: 7ac97cb9-b457-4052-af94-d82478c35c5e runId: 01954eb9-6963-7b52-9a1d-b74e64643846 +INFO i.t.s.n.caller.CallerStarter - Workflow result: Nexus Echo πŸ‘‹ +INFO i.t.s.n.caller.CallerStarter - Started HelloCallerWorkflow workflowId: 9e0bc89c-5709-4742-b7c0-868464c2fccf runId: 01954eb9-6ae3-7d6d-b355-71545688309d +INFO i.t.s.n.caller.CallerStarter - Workflow result: Hello Nexus πŸ‘‹ +``` + +And this on the handler side: +``` +INFO i.t.s.n.handler.NexusServiceImpl - Echo called from a workflow with ID : 7ac97cb9-b457-4052-af94-d82478c35c5e +INFO i.t.s.n.h.HelloHandlerWorkflowImpl - HelloHandlerWorkflow called from a workflow with ID : 9e0bc89c-5709-4742-b7c0-868464c2fccf +``` diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerStarter.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerStarter.java new file mode 100644 index 000000000..823c3ccea --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerStarter.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.caller; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.nexus.caller.EchoCallerWorkflow; +import io.temporal.samples.nexus.caller.HelloCallerWorkflow; +import io.temporal.samples.nexus.options.ClientOptions; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.samples.nexuscontextpropagation.propogation.MDCContextPropagator; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerStarter { + private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class); + + public static void main(String[] args) { + WorkflowClient client = + ClientOptions.getWorkflowClient( + args, + WorkflowClientOptions.newBuilder() + .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build(); + EchoCallerWorkflow echoWorkflow = + client.newWorkflowStub(EchoCallerWorkflow.class, workflowOptions); + WorkflowExecution execution = WorkflowClient.start(echoWorkflow::echo, "Nexus Echo πŸ‘‹"); + logger.info( + "Started EchoCallerWorkflow workflowId: {} runId: {}", + execution.getWorkflowId(), + execution.getRunId()); + logger.info("Workflow result: {}", echoWorkflow.echo("Nexus Echo πŸ‘‹")); + HelloCallerWorkflow helloWorkflow = + client.newWorkflowStub(HelloCallerWorkflow.class, workflowOptions); + execution = WorkflowClient.start(helloWorkflow::hello, "Nexus", NexusService.Language.EN); + logger.info( + "Started HelloCallerWorkflow workflowId: {} runId: {}", + execution.getWorkflowId(), + execution.getRunId()); + logger.info("Workflow result: {}", helloWorkflow.hello("Nexus", NexusService.Language.ES)); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerWorker.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerWorker.java new file mode 100644 index 000000000..387ea01be --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/CallerWorker.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexus.options.ClientOptions; +import io.temporal.samples.nexuscontextpropagation.propogation.NexusMDCContextInterceptor; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; + +public class CallerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-caller-workflow-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkerFactory factory = + WorkerFactory.newInstance( + client, + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new NexusMDCContextInterceptor()) + .build()); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + Collections.singletonMap( + "NexusService", + NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build())) + .build(), + EchoCallerWorkflowImpl.class, + HelloCallerWorkflowImpl.class); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/EchoCallerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/EchoCallerWorkflowImpl.java new file mode 100644 index 000000000..23cb2ce49 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/EchoCallerWorkflowImpl.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.caller; + +import io.temporal.samples.nexus.caller.EchoCallerWorkflow; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import org.slf4j.MDC; + +public class EchoCallerWorkflowImpl implements EchoCallerWorkflow { + NexusService nexusService = + Workflow.newNexusServiceStub( + NexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String echo(String message) { + MDC.put("x-nexus-caller-workflow-id", Workflow.getInfo().getWorkflowId()); + return nexusService.echo(new NexusService.EchoInput(message)).getMessage(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/HelloCallerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/HelloCallerWorkflowImpl.java new file mode 100644 index 000000000..0fec8c7d4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/caller/HelloCallerWorkflowImpl.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.caller; + +import io.temporal.samples.nexus.caller.HelloCallerWorkflow; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.workflow.NexusOperationHandle; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import org.slf4j.MDC; + +public class HelloCallerWorkflowImpl implements HelloCallerWorkflow { + NexusService nexusService = + Workflow.newNexusServiceStub( + NexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String hello(String message, NexusService.Language language) { + MDC.put("x-nexus-caller-workflow-id", Workflow.getInfo().getWorkflowId()); + NexusOperationHandle handle = + Workflow.startNexusOperation( + nexusService::hello, new NexusService.HelloInput(message, language)); + // Optionally wait for the operation to be started. NexusOperationExecution will contain the + // operation token in case this operation is asynchronous. + handle.getExecution().get(); + return handle.getResult().get().getMessage(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HandlerWorker.java new file mode 100644 index 000000000..d2536bc41 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HandlerWorker.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.samples.nexus.options.ClientOptions; +import io.temporal.samples.nexuscontextpropagation.propogation.MDCContextPropagator; +import io.temporal.samples.nexuscontextpropagation.propogation.NexusMDCContextInterceptor; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; +import java.util.Collections; + +public class HandlerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = + ClientOptions.getWorkflowClient( + args, + WorkflowClientOptions.newBuilder() + .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))); + + WorkerFactory factory = + WorkerFactory.newInstance( + client, + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new NexusMDCContextInterceptor()) + .build()); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class); + worker.registerNexusServiceImplementation(new NexusServiceImpl()); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HelloHandlerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HelloHandlerWorkflowImpl.java new file mode 100644 index 000000000..389368b50 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/HelloHandlerWorkflowImpl.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.handler; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus.handler.HelloHandlerWorkflow; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.workflow.Workflow; +import org.slf4j.Logger; +import org.slf4j.MDC; + +public class HelloHandlerWorkflowImpl implements HelloHandlerWorkflow { + public static final Logger log = Workflow.getLogger(HelloHandlerWorkflowImpl.class); + + @Override + public NexusService.HelloOutput hello(NexusService.HelloInput input) { + if (MDC.get("x-nexus-caller-workflow-id") != null) { + log.info( + "HelloHandlerWorkflow called from a workflow with ID : {}", + MDC.get("x-nexus-caller-workflow-id")); + } + switch (input.getLanguage()) { + case EN: + return new NexusService.HelloOutput("Hello " + input.getName() + " πŸ‘‹"); + case FR: + return new NexusService.HelloOutput("Bonjour " + input.getName() + " πŸ‘‹"); + case DE: + return new NexusService.HelloOutput("Hallo " + input.getName() + " πŸ‘‹"); + case ES: + return new NexusService.HelloOutput("Β‘Hola! " + input.getName() + " πŸ‘‹"); + case TR: + return new NexusService.HelloOutput("Merhaba " + input.getName() + " πŸ‘‹"); + } + throw ApplicationFailure.newFailure( + "Unsupported language: " + input.getLanguage(), "UNSUPPORTED_LANGUAGE"); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/NexusServiceImpl.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/NexusServiceImpl.java new file mode 100644 index 000000000..cad37b9ed --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/handler/NexusServiceImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowRunOperation; +import io.temporal.samples.nexus.handler.HelloHandlerWorkflow; +import io.temporal.samples.nexus.service.NexusService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +// To create a service implementation, annotate the class with @ServiceImpl and provide the +// interface that the service implements. The service implementation class should have methods that +// return OperationHandler that correspond to the operations defined in the service interface. +@ServiceImpl(service = NexusService.class) +public class NexusServiceImpl { + private static final Logger logger = LoggerFactory.getLogger(NexusServiceImpl.class); + + @OperationImpl + public OperationHandler echo() { + // OperationHandler.sync is a meant for exposing simple RPC handlers. + return OperationHandler.sync( + // The method is for making arbitrary short calls to other services or databases, or + // perform simple computations such as this one. Users can also access a workflow client by + // calling + // Nexus.getOperationContext().getWorkflowClient(ctx) to make arbitrary calls such as + // signaling, querying, or listing workflows. + (ctx, details, input) -> { + if (MDC.get("x-nexus-caller-workflow-id") != null) { + logger.info( + "Echo called from a workflow with ID : {}", MDC.get("x-nexus-caller-workflow-id")); + } + return new NexusService.EchoOutput(input.getMessage()); + }); + } + + @OperationImpl + public OperationHandler hello() { + // Use the WorkflowRunOperation.fromWorkflowMethod constructor, which is the easiest + // way to expose a workflow as an operation. + return WorkflowRunOperation.fromWorkflowMethod( + (ctx, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + HelloHandlerWorkflow.class, + // Workflow IDs should typically be business meaningful IDs and are used to + // dedupe workflow starts. + // For this example, we're using the request ID allocated by Temporal when + // the + // caller workflow schedules + // the operation, this ID is guaranteed to be stable across retries of this + // operation. + // + // Task queue defaults to the task queue this operation is handled on. + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::hello); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/MDCContextPropagator.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/MDCContextPropagator.java new file mode 100644 index 000000000..db75395fd --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/MDCContextPropagator.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.propogation; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.context.ContextPropagator; +import io.temporal.common.converter.DataConverter; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.MDC; + +public class MDCContextPropagator implements ContextPropagator { + + @Override + public String getName() { + return this.getClass().getName(); + } + + @Override + public Object getCurrentContext() { + Map context = new HashMap<>(); + if (MDC.getCopyOfContextMap() == null) { + return context; + } + for (Map.Entry entry : MDC.getCopyOfContextMap().entrySet()) { + if (entry.getKey().startsWith("x-nexus-")) { + context.put(entry.getKey(), entry.getValue()); + } + } + return context; + } + + @Override + public void setCurrentContext(Object context) { + Map contextMap = (Map) context; + for (Map.Entry entry : contextMap.entrySet()) { + MDC.put(entry.getKey(), entry.getValue()); + } + } + + @Override + public Map serializeContext(Object context) { + Map contextMap = (Map) context; + Map serializedContext = new HashMap<>(); + for (Map.Entry entry : contextMap.entrySet()) { + serializedContext.put( + entry.getKey(), DataConverter.getDefaultInstance().toPayload(entry.getValue()).get()); + } + return serializedContext; + } + + @Override + public Object deserializeContext(Map context) { + Map contextMap = new HashMap<>(); + for (Map.Entry entry : context.entrySet()) { + contextMap.put( + entry.getKey(), + DataConverter.getDefaultInstance() + .fromPayload(entry.getValue(), String.class, String.class)); + } + return contextMap; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/NexusMDCContextInterceptor.java b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/NexusMDCContextInterceptor.java new file mode 100644 index 000000000..944767e4b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscontextpropagation/propogation/NexusMDCContextInterceptor.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscontextpropagation.propogation; + +import io.nexusrpc.OperationException; +import io.nexusrpc.handler.OperationContext; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; +import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; +import java.util.Map; +import org.slf4j.MDC; + +/** + * Propagates MDC context from the caller workflow to the Nexus service through the operation + * headers. + */ +public class NexusMDCContextInterceptor extends WorkerInterceptorBase { + private static final String NEXUS_HEADER_PREFIX = "x-nexus-"; + + @Override + public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { + return new WorkflowInboundCallsInterceptorNexusMDC(next); + } + + public static class WorkflowInboundCallsInterceptorNexusMDC + extends io.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase { + private final WorkflowInboundCallsInterceptor next; + + public WorkflowInboundCallsInterceptorNexusMDC(WorkflowInboundCallsInterceptor next) { + super(next); + this.next = next; + } + + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + next.init(new WorkflowOutboundCallsInterceptorNexusMDC(outboundCalls)); + } + } + + public static class WorkflowOutboundCallsInterceptorNexusMDC + extends io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase { + private final WorkflowOutboundCallsInterceptor next; + + public WorkflowOutboundCallsInterceptorNexusMDC(WorkflowOutboundCallsInterceptor next) { + super(next); + this.next = next; + } + + @Override + public ExecuteNexusOperationOutput executeNexusOperation( + ExecuteNexusOperationInput input) { + Map contextMap = MDC.getCopyOfContextMap(); + if (contextMap != null) { + Map headerMap = input.getHeaders(); + contextMap.forEach( + (k, v) -> { + if (k.startsWith(NEXUS_HEADER_PREFIX)) { + headerMap.put(k, v); + } + }); + } + return next.executeNexusOperation(input); + } + } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return new NexusOperationInboundCallsInterceptorNexusMDC(next); + } + + private static class NexusOperationInboundCallsInterceptorNexusMDC + extends io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase { + private final NexusOperationInboundCallsInterceptor next; + + public NexusOperationInboundCallsInterceptorNexusMDC( + NexusOperationInboundCallsInterceptor next) { + super(next); + this.next = next; + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationException { + input + .getOperationContext() + .getHeaders() + .forEach( + (k, v) -> { + if (k.startsWith(NEXUS_HEADER_PREFIX)) { + MDC.put(k, v); + } + }); + return next.startOperation(input); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + input + .getOperationContext() + .getHeaders() + .forEach( + (k, v) -> { + if (k.startsWith(NEXUS_HEADER_PREFIX)) { + MDC.put(k, v); + } + }); + return next.cancelOperation(input); + } + } +}