Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;

import io.temporal.activity.ActivityExecutionContext;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class AutoHeartbeatUtil {
private final long period;
private final long initialDelay;
private final TimeUnit periodTimeUnit;
private final ScheduledExecutorService timerService =
Executors.newSingleThreadScheduledExecutor();
private final ActivityExecutionContext context;
private final Object details;
private String heartbeaterId;

public AutoHeartbeatUtil(
long period,
long initialDelay,
TimeUnit periodTimeUnit,
ActivityExecutionContext context,
Object details) {
this.period = period;
this.initialDelay = initialDelay;
this.periodTimeUnit = periodTimeUnit;
this.context = context;
this.details = details;
// Set to activity id better, for sample we just use type
heartbeaterId = context.getInfo().getActivityType();
}

public ScheduledFuture<?> start() {
System.out.println("Autoheartbeater[" + heartbeaterId + "] starting...");
return timerService.scheduleAtFixedRate(
() -> {
try {
System.out.println(
"Autoheartbeater["
+ heartbeaterId
+ "]"
+ "heartbeating at: "
+ printShortCurrentTime());
context.heartbeat(details);
} catch (Exception e) {
System.out.println(
"Stopping Autoheartbeater[" + heartbeaterId + "]: " + e.getMessage());
stop();
}
},
initialDelay,
period,
periodTimeUnit);
}

public void stop() {
System.out.println("Autoheartbeater[" + heartbeaterId + "] being requested to stop.");
// Try not to execute another heartbeat that could have been queued up
timerService.shutdownNow();
}

private String printShortCurrentTime() {
return DateTimeFormatter.ofPattern("HH:mm:ss")
.withZone(ZoneId.systemDefault())
.format(Instant.now());
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/io/temporal/samples/autoheartbeat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Auto-heartbeating sample for activities that define HeartbeatTimeout

This sample shows an implementation of an "auto-heartbeating" utility that can be applied via interceptor
to all activities where you define HeartbeatTimeout. Use case where this can be helpful include
situations where you have long-running activities where you want to heartbeat but its difficult
to explicitly call heartbeat api in activity code directly.

1. Start the Sample:
```bash
./gradlew -q execute -PmainClass=io.temporal.samples.autoheartbeat.Starter
```

The sample workflow starts three activities async, two of which define heartbeat timeout.
Activity interceptor in this sample applies the auto-heartbeating util to the two that define heartbeat timeout
and auto-heartbeats at its HeartbeatTimeout - 1s intervals.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl;
import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerFactoryOptions;

public class Starter {
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
static final String WORKFLOW_ID = "AutoHeartbeatWorkflow";

public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);

// Configure our auto heartbeat workflow interceptor which will apply
// AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat
// timeout configured
WorkerFactoryOptions wfo =
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor())
.build();

WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
Worker worker = factory.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
worker.registerActivitiesImplementations(new AutoActivitiesImpl());

factory.start();

AutoWorkflow workflow =
client.newWorkflowStub(
AutoWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());

try {
String result = workflow.exec("Auto heartbeating is cool");
System.out.println("Result: " + result);
} catch (Exception e) {
System.out.println("Workflow exec exception: " + e.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.activities;

import io.temporal.activity.ActivityInterface;

@ActivityInterface
public interface AutoActivities {
String runActivityOne(String input);

String runActivityTwo(String input);

String runActivityThree(String input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.activities;

import io.temporal.client.ActivityCompletionException;
import java.util.concurrent.TimeUnit;

public class AutoActivitiesImpl implements AutoActivities {

@Override
public String runActivityOne(String input) {
return runActivity("runActivityOne - " + input, 20);
}

@Override
public String runActivityTwo(String input) {
return runActivity("runActivityTwo - " + input, 10);
}

@Override
public String runActivityThree(String input) {
return runActivity("runActivityThree - " + input, 3);
}

@SuppressWarnings("FutureReturnValueIgnored")
private String runActivity(String input, int seconds) {
for (int i = 0; i < seconds; i++) {
try {
sleep(1);
} catch (ActivityCompletionException e) {
System.out.println(
"Activity type:"
+ e.getActivityType().get()
+ "Activiy id: "
+ e.getActivityId().get()
+ "Workflow id: "
+ e.getWorkflowId().get()
+ "Workflow runid: "
+ e.getRunId().get()
+ " was canceled. Shutting down auto heartbeats");
// We want to rethrow the cancel failure
throw e;
}
}
return "Activity completed: " + input;
}

private void sleep(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException ee) {
// Empty
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.interceptor;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase;
import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class AutoHeartbeatActivityInboundCallsInterceptor
extends ActivityInboundCallsInterceptorBase {
private ActivityExecutionContext activityExecutionContext;
private Duration activityHeartbeatTimeout;

public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) {
super(next);
}

@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout();
super.init(context);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public ActivityOutput execute(ActivityInput input) {
// If activity has heartbeat timeout defined we want to apply auto-heartbeter
AutoHeartbeatUtil autoHearbeater = null;
if (activityHeartbeatTimeout != null && activityHeartbeatTimeout.getSeconds() > 0) {
System.out.println(
"Auto heartbeating applied for activity: "
+ activityExecutionContext.getInfo().getActivityType());
autoHearbeater =
new AutoHeartbeatUtil(
getHeartbeatPeriod(activityHeartbeatTimeout),
0,
TimeUnit.SECONDS,
activityExecutionContext,
input);
autoHearbeater.start();
} else {
System.out.println(
"Auto heartbeating not being applied for activity: "
+ activityExecutionContext.getInfo().getActivityType());
}

try {
return super.execute(input);
} catch (Exception e) {
throw e;
} finally {
if (autoHearbeater != null) {
autoHearbeater.stop();
}
}
}

private long getHeartbeatPeriod(Duration activityHeartbeatTimeout) {
// For sample we want to heartbeat 1 seconds less than heartbeat timeout
return activityHeartbeatTimeout.getSeconds() <= 1
? 1
: activityHeartbeatTimeout.getSeconds() - 1;
}
}
Loading
Loading