diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java new file mode 100644 index 000000000..1bda80e85 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.activity.ActivityExecutionContext; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatUtil { + private final long period; + private final long initialDelay; + private final TimeUnit periodTimeUnit; + private final ScheduledExecutorService timerService = + Executors.newSingleThreadScheduledExecutor(); + private final ActivityExecutionContext context; + private final Object details; + private String heartbeaterId; + + public AutoHeartbeatUtil( + long period, + long initialDelay, + TimeUnit periodTimeUnit, + ActivityExecutionContext context, + Object details) { + this.period = period; + this.initialDelay = initialDelay; + this.periodTimeUnit = periodTimeUnit; + this.context = context; + this.details = details; + // Set to activity id better, for sample we just use type + heartbeaterId = context.getInfo().getActivityType(); + } + + public ScheduledFuture start() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] starting..."); + return timerService.scheduleAtFixedRate( + () -> { + try { + System.out.println( + "Autoheartbeater[" + + heartbeaterId + + "]" + + "heartbeating at: " + + printShortCurrentTime()); + context.heartbeat(details); + } catch (Exception e) { + System.out.println( + "Stopping Autoheartbeater[" + heartbeaterId + "]: " + e.getMessage()); + stop(); + } + }, + initialDelay, + period, + periodTimeUnit); + } + + public void stop() { + System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop."); + // Try not to execute another heartbeat that could have been queued up + timerService.shutdownNow(); + } + + private String printShortCurrentTime() { + return DateTimeFormatter.ofPattern("HH:mm:ss") + .withZone(ZoneId.systemDefault()) + .format(Instant.now()); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md new file mode 100644 index 000000000..eeebd93b2 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md @@ -0,0 +1,15 @@ +# Auto-heartbeating sample for activities that define HeartbeatTimeout + +This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor +to all activities where you define HeartbeatTimeout. Use case where this can be helpful include +situations where you have long-running activities where you want to heartbeat but its difficult +to explicitly call heartbeat api in activity code directly. + +1. Start the Sample: +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter +``` + +The sample workflow starts three activities async, two of which define heartbeat timeout. +Activity interceptor in this sample applies the auto-heartbeating util to the two that define heartbeat timeout +and auto-heartbeats at its HeartbeatTimeout - 1s intervals. \ No newline at end of file diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java new file mode 100644 index 000000000..a488bcb6f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; +import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; + +public class Starter { + static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; + static final String WORKFLOW_ID = "AutoHeartbeatWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); + worker.registerActivitiesImplementations(new AutoActivitiesImpl()); + + factory.start(); + + AutoWorkflow workflow = + client.newWorkflowStub( + AutoWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + try { + String result = workflow.exec("Auto heartbeating is cool"); + System.out.println("Result: " + result); + } catch (Exception e) { + System.out.println("Workflow exec exception: " + e.getClass().getName()); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java new file mode 100644 index 000000000..1ea9a52ae --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface AutoActivities { + String runActivityOne(String input); + + String runActivityTwo(String input); + + String runActivityThree(String input); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java new file mode 100644 index 000000000..10883bca7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.activities; + +import io.temporal.client.ActivityCompletionException; +import java.util.concurrent.TimeUnit; + +public class AutoActivitiesImpl implements AutoActivities { + + @Override + public String runActivityOne(String input) { + return runActivity("runActivityOne - " + input, 20); + } + + @Override + public String runActivityTwo(String input) { + return runActivity("runActivityTwo - " + input, 10); + } + + @Override + public String runActivityThree(String input) { + return runActivity("runActivityThree - " + input, 3); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private String runActivity(String input, int seconds) { + for (int i = 0; i < seconds; i++) { + try { + sleep(1); + } catch (ActivityCompletionException e) { + System.out.println( + "Activity type:" + + e.getActivityType().get() + + "Activiy id: " + + e.getActivityId().get() + + "Workflow id: " + + e.getWorkflowId().get() + + "Workflow runid: " + + e.getRunId().get() + + " was canceled. Shutting down auto heartbeats"); + // We want to rethrow the cancel failure + throw e; + } + } + return "Activity completed: " + input; + } + + private void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException ee) { + // Empty + } + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java new file mode 100644 index 000000000..036f09b86 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private ActivityExecutionContext activityExecutionContext; + private Duration activityHeartbeatTimeout; + + public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { + super(next); + } + + @Override + public void init(ActivityExecutionContext context) { + this.activityExecutionContext = context; + activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout(); + super.init(context); + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public ActivityOutput execute(ActivityInput input) { + // If activity has heartbeat timeout defined we want to apply auto-heartbeter + AutoHeartbeatUtil autoHearbeater = null; + if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) { + System.out.println( + "Auto heartbeating applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + autoHearbeater = + new AutoHeartbeatUtil( + getHeartbeatPeriod(activityHeartbeatTimeout), + 0, + TimeUnit.SECONDS, + activityExecutionContext, + input); + autoHearbeater.start(); + } else { + System.out.println( + "Auto heartbeating not being applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + } + + try { + return super.execute(input); + } catch (Exception e) { + throw e; + } finally { + if (autoHearbeater != null) { + autoHearbeater.stop(); + } + } + } + + private long getHeartbeatPeriod(Duration activityHeartbeatTimeout) { + // For sample we want to heartbeat 1 seconds less than heartbeat timeout + return activityHeartbeatTimeout.getSeconds() <= 1 + ? 1 + : activityHeartbeatTimeout.getSeconds() - 1; + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java new file mode 100644 index 000000000..9816fe644 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; + +public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase { + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new AutoHeartbeatActivityInboundCallsInterceptor(next); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java new file mode 100644 index 000000000..e8816c6a2 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface AutoWorkflow { + @WorkflowMethod + String exec(String input); + + @SignalMethod + void cancelActivity(); +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java new file mode 100644 index 000000000..52337ca41 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.workflows; + +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivities; +import io.temporal.workflow.Async; +import io.temporal.workflow.CancellationScope; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +public class AutoWorkflowImpl implements AutoWorkflow { + private CancellationScope scope; + + @Override + public String exec(String input) { + // Crete separate workflow stubs for same interface so we can show + // use of different heartbeat timeouts and activit that does not heartbeat + // Note you can do this also via WorkflowImplementationOptions instead of using different + // activity stubs if you wanted + AutoActivities activitiesOne = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(22)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .build()); + + AutoActivities activitiesTwo = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(12)) + .setHeartbeatTimeout(Duration.ofSeconds(3)) + .build()); + + // Activity three does not heartbeat so autoheartbeat should not be applied to it + AutoActivities activitiesThree = + Workflow.newActivityStub( + AutoActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build()); + + // Start our activity in CancellationScope so we can cancel it if needed + List> activityPromises = new ArrayList<>(); + scope = + Workflow.newCancellationScope( + () -> { + activityPromises.add(Async.function(activitiesOne::runActivityOne, input)); + activityPromises.add(Async.function(activitiesTwo::runActivityTwo, input)); + activityPromises.add(Async.function(activitiesThree::runActivityThree, input)); + }); + + scope.run(); + + try { + Promise.allOf(activityPromises).get(); + String result = ""; + for (Promise pr : activityPromises) { + result += pr.get(); + } + return result; + } catch (ActivityFailure e) { + if (e.getCause() instanceof CanceledFailure) { + // We dont want workflow to fail in we canceled our scope, just log and return + return "Workflow result after activity cancellation"; + } else { + // We want to fail execution on any other failures except cancellation + throw e; + } + } + } + + @Override + public void cancelActivity() { + scope.cancel("Canceling scope from signal handler"); + } +}