diff --git a/pom.xml b/pom.xml
index c573d98ff..737690625 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
https://sonarcloud.io
4.12.0
0.8.11
+ 1.29.0
@@ -506,42 +507,42 @@
io.opentelemetry.javaagent
opentelemetry-javaagent
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry.instrumentation
opentelemetry-instrumentation-annotations
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-api
- 1.29.0
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-exporter-otlp
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-sdk
- 1.29.0
+ ${opentelemetry.version}
io.opentelemetry
opentelemetry-semconv
1.29.0-alpha
-
- io.opentelemetry
- opentelemetry-sdk-extension-autoconfigure
- 1.30.1
-
io.opentelemetry
opentelemetry-sdk-extension-autoconfigure-spi
- 1.29.0
+ ${opentelemetry.version}
diff --git a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
index a6391dc31..96781cf8b 100644
--- a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
+++ b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java
@@ -146,9 +146,10 @@ public DAGTraversal dagTraversal(
@Autowired @Qualifier("dagContextStorage") DAGContextStorage dagContextStorage,
@Autowired @Qualifier("dagInfoStorage") DAGInfoStorage dagInfoStorage,
@Autowired @Qualifier("dagStorageProcedure") DAGStorageProcedure dagStorageProcedure,
- @Autowired @Qualifier("traversalExecutor") ExecutorService traversalExecutor) {
+ @Autowired @Qualifier("traversalExecutor") ExecutorService traversalExecutor,
+ @Autowired TracerHelper tracerHelper) {
log.info("begin to init default DAGTraversal bean");
- DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, traversalExecutor);
+ DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, traversalExecutor, tracerHelper);
dagTraversal.setStasher(stasher);
return dagTraversal;
}
@@ -299,10 +300,11 @@ public DAGOperations dagOperations(
@Autowired @Qualifier("dagCallback") Callback dagCallback,
@Autowired @Qualifier("timeCheckRunner") TimeCheckRunner timeCheckRunner,
@Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor,
- @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) {
+ @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler,
+ @Autowired @Qualifier("tracerHelper") TracerHelper tracerHelper) {
log.info("begin to init default DAGOperations bean");
DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner,
- timeCheckRunner, dagTraversal, dagCallback, dagResultHandler);
+ timeCheckRunner, dagTraversal, dagCallback, dagResultHandler, tracerHelper);
dagTraversal.setDagOperations(dagOperations);
timeCheckRunner.setDagOperations(dagOperations);
return dagOperations;
diff --git a/rill-flow-dag/olympicene-traversal/pom.xml b/rill-flow-dag/olympicene-traversal/pom.xml
index c13f365bf..eae63ac02 100644
--- a/rill-flow-dag/olympicene-traversal/pom.xml
+++ b/rill-flow-dag/olympicene-traversal/pom.xml
@@ -81,6 +81,10 @@
io.opentelemetry
opentelemetry-sdk-extension-autoconfigure-spi
+
+ com.weibo
+ rill-flow-common
+
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
index 81c43f237..897e5373b 100644
--- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.weibo.rill.flow.common.constant.ReservedConstant;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.*;
@@ -37,9 +38,13 @@
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -61,6 +66,7 @@ public class DAGOperations {
private final DAGTraversal dagTraversal;
private final Callback callback;
private final DAGResultHandler dagResultHandler;
+ private final TracerHelper tracerHelper;
public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> {
@@ -79,7 +85,7 @@ public class DAGOperations {
public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner,
TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback,
- DAGResultHandler dagResultHandler) {
+ DAGResultHandler dagResultHandler, TracerHelper tracerHelper) {
this.runnerExecutor = runnerExecutor;
this.taskRunners = taskRunners;
this.dagRunner = dagRunner;
@@ -87,15 +93,20 @@ public DAGOperations(ExecutorService runnerExecutor, Map tas
this.dagTraversal = dagTraversal;
this.callback = callback;
this.dagResultHandler = dagResultHandler;
+ this.tracerHelper = tracerHelper;
}
public void runTasks(String executionId, Collection>> taskInfoToContexts) {
log.info("runTasks begin submit task executionId:{}", executionId);
+ Context parentContext = Context.current(); // 捕获当前的 context
+
taskInfoToContexts.forEach(taskInfoToContext -> runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
TaskInfo taskInfo = taskInfoToContext.getLeft();
try {
log.info("runTasks task begin to execute executionId:{} taskInfoName:{}", executionId, taskInfo.getName());
- runTask(executionId, taskInfo, taskInfoToContext.getRight());
+ try (Scope ignored = parentContext.makeCurrent()) { // 在新线程中恢复 context
+ runTask(executionId, taskInfo, taskInfoToContext.getRight());
+ }
} catch (Exception e) {
log.error("runTasks fails, executionId:{}, taskName:{}", executionId, taskInfo.getName(), e);
}
@@ -104,49 +115,73 @@ public void runTasks(String executionId, Collection context) {
- Map params = Maps.newHashMap();
- params.put(EXECUTION_ID, executionId);
- params.put("taskInfo", taskInfo);
- params.put("context", context);
-
- TaskRunner runner = selectRunner(taskInfo);
- Supplier basicActions = () -> runner.run(executionId, taskInfo, context);
-
- Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
- ExecutionResult executionResult = supplier.get();
-
- /*
- 任务执行后结果类型
- 1. 任务执行完成 如 return/pass
- 1.1 任务执行完成 需要寻找下一个能执行的任务
- 1.2 任务执行中 需要外部系统调finish触发下一个任务执行
- 1.3 任务需要重试
- 2. 流程控制类任务 foreach/choice
- 2.1 触发能够执行的子任务
- */
- // 对应1.1
- if (isTaskCompleted(executionResult)) {
- dagTraversal.submitTraversal(executionId, taskInfo.getName());
- invokeTaskCallback(executionId, taskInfo, context);
+ // 恢复 execution context
+ Context executionContext = tracerHelper.loadExecutionContext(executionId);
+ if (executionContext == null) {
+ executionContext = Context.current();
}
- // 对应1.2
- if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
- Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
- Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
- .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
- }
- // 对应1.3
- if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
- runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
- executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
- }
- // 对应2.1
- if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
- executionResult.getSubTaskInfosAndContext()
- .forEach(subTaskInfosAndContext -> {
- dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
- safeSleep(10);
- });
+
+ Span span = tracerHelper.getTracer().spanBuilder("runTask " + taskInfo.getName())
+ .setAttribute("execution.id", executionId)
+ .setAttribute("task.name", taskInfo.getName())
+ .setAttribute("task.category", taskInfo.getTask().getCategory())
+ .setParent(executionContext)
+ .startSpan();
+
+ TaskStatus executionResultStatus = null;
+ try (Scope ignored = Context.current().with(span).makeCurrent()) {
+ TaskRunner runner = selectRunner(taskInfo);
+ Supplier basicActions = () -> runner.run(executionId, taskInfo, context);
+
+ Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, Maps.newHashMap(), SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS);
+ ExecutionResult executionResult = supplier.get();
+
+ /*
+ 任务执行后结果类型
+ 1. 任务执行完成 如 return/pass
+ 1.1 任务执行完成 需要寻找下一个能执行的任务
+ 1.2 任务执行中 需要外部系统调finish触发下一个任务执行
+ 1.3 任务需要重试
+ 2. 流程控制类任务 foreach/choice
+ 2.1 触发能够执行的子任务
+ */
+ // 对应1.1
+ if (isTaskCompleted(executionResult)) {
+ Context currentContext = Context.current();
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignore = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, taskInfo.getName());
+ invokeTaskCallback(executionId, taskInfo, context);
+ }
+ }));
+ }
+ executionResultStatus = executionResult.getTaskStatus();
+ // 对应1.2
+ if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
+ Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null);
+ Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline))
+ .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds));
+ tracerHelper.saveSpan(executionId, taskInfo.getName(), executionContext, span);
+ return;
+ }
+ // 对应1.3
+ if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
+ runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
+ executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
+ }
+ // 对应2.1
+ if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
+ executionResult.getSubTaskInfosAndContext()
+ .forEach(subTaskInfosAndContext -> {
+ dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight());
+ safeSleep(10);
+ });
+ }
+ } finally {
+ if (executionResultStatus != null && executionResultStatus.isCompleted()) {
+ span.setAttribute("status", executionResultStatus.toString());
+ span.end();
+ }
}
}
@@ -204,24 +239,51 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n
Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS);
ExecutionResult executionResult = supplier.get();
- if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
- timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
- runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
- executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
- }
- if (isTaskCompleted(executionResult)) {
- timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
- dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
- invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
- }
- if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
- dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
- }
+ // 尝试恢复之前的 span
+ Span span = tracerHelper.loadSpan(executionId, executionResult.getTaskInfo().getName());
+ Context currentContext = span != null ? Context.current().with(span) : Context.current();
+
+ try (Scope scope = currentContext.makeCurrent()) {
+ if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
+ timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(),
+ executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
+ }
+ }));
+ }
+ if (isTaskCompleted(executionResult)) {
+ timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo());
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext());
+ }
+ }));
+ }
+ if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal());
+ }
+ }));
+ }
- // key finished
- if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
- || isSubFlowTaskKeyCompleted(executionResult)) {
- dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ // key finished
+ if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex())
+ || isSubFlowTaskKeyCompleted(executionResult)) {
+ runnerExecutor.execute(new ExecutionRunnable(executionId, () -> {
+ try (Scope ignored = currentContext.makeCurrent()) {
+ dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName());
+ }
+ }));
+ }
+ } finally {
+ if (span != null && executionResult.getTaskStatus().isCompleted()) {
+ span.setAttribute("status", executionResult.getTaskStatus().toString());
+ span.end();
+ }
}
}
@@ -232,15 +294,84 @@ public void redoTask(String executionId, List taskNames, Map data, NotifyInfo notifyInfo) {
- log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
- ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
- Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
- .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
- dagTraversal.submitTraversal(executionId, null);
+ String[] executionIdInfos = executionId.split(ReservedConstant.EXECUTION_ID_CONNECTOR);
+ String descriptorId = executionIdInfos[0];
+
+ // 创建 DAG 的根 span
+ Span dagSpan = tracerHelper.getTracer().spanBuilder("submitDAG " + descriptorId)
+ .setAttribute("execution.id", executionId)
+ .setAttribute("dag.name", dag.getDagName())
+ .setParent(Context.current()) // 显式设置父 context
+ .startSpan();
+
+ Context dagContext = Context.current().with(dagSpan);
+ try (Scope ignored = dagContext.makeCurrent()) {
+ // 保存 execution context
+ tracerHelper.saveExecutionContext(executionId, dagContext);
+
+ log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo);
+ ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo);
+ Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline()))
+ .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds));
+ dagTraversal.submitTraversal(executionId, null);
+ }
}
public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) {
log.info("finishDAG task begin to execute executionId:{} dagStatus:{}", executionId, dagStatus);
+
+ // 恢复 execution context 以获取 DAG span
+ Context executionContext = tracerHelper.loadExecutionContext(executionId);
+ if (executionContext != null) {
+ Span dagSpan = Span.fromContext(executionContext);
+ try (Scope ignored = executionContext.makeCurrent()) {
+ ExecutionResult executionResult = finishDAGInternal(executionId, dagInfo, dagStatus, dagInvokeMsg);
+
+ DAGInfo dagInfoRet = executionResult.getDagInfo();
+ Map context = executionResult.getContext();
+
+ timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag());
+
+ List executionRoutes = Optional.ofNullable(dagInfoRet.getDagInvokeMsg())
+ .map(DAGInvokeMsg::getExecutionRoutes)
+ .orElse(new ArrayList<>());
+ executionRoutes.stream()
+ .max(Comparator.comparingInt(ExecutionInfo::getIndex))
+ .filter(executionInfo -> executionInfo.getExecutionType() == FunctionPattern.FLOW_SYNC)
+ .ifPresent(executionInfo -> {
+ try {
+ String parentDAGExecutionId = executionInfo.getExecutionId();
+ String taskInfoName = executionInfo.getTaskInfoName();
+ TaskStatus taskStatus = calculateSubFlowTaskStatus(dagInfoRet);
+ TaskInvokeMsg taskInvokeMsg = Optional.ofNullable(dagInfoRet.getDagInvokeMsg())
+ .map(it -> TaskInvokeMsg.builder().code(it.getCode()).msg(it.getMsg()).ext(it.getExt()).build())
+ .orElse(null);
+ NotifyInfo notifyInfo = NotifyInfo.builder().taskInfoName(taskInfoName).taskStatus(taskStatus).taskInvokeMsg(taskInvokeMsg).build();
+ finishTaskSync(parentDAGExecutionId, TaskCategory.FUNCTION.getValue(), notifyInfo, context);
+ } catch (Exception e) {
+ log.warn("finishDAG fails to finish task, executionInfo:{}", executionInfo, e);
+ }
+ });
+
+ trialClose(executionId, dagStatus, dagInfoRet, context);
+
+ // 在 DAG 完成时结束 span
+ dagSpan.setAttribute("status", dagStatus.name());
+ dagSpan.end();
+ }
+ } else {
+ ExecutionResult executionResult = finishDAGInternal(executionId, dagInfo, dagStatus, dagInvokeMsg);
+
+ DAGInfo dagInfoRet = executionResult.getDagInfo();
+ Map context = executionResult.getContext();
+
+ timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag());
+
+ trialClose(executionId, dagStatus, dagInfoRet, context);
+ }
+ }
+
+ private ExecutionResult finishDAGInternal(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) {
Map params = Maps.newHashMap();
params.put(EXECUTION_ID, executionId);
params.put("dagInfo", dagInfo);
@@ -249,35 +380,7 @@ public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus,
Supplier basicActions = () -> dagRunner.finishDAG(executionId, dagInfo, dagStatus, dagInvokeMsg);
Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.DAG_FINISH_CUSTOMIZED_PLUGINS);
- ExecutionResult executionResult = supplier.get();
-
- DAGInfo dagInfoRet = executionResult.getDagInfo();
- Map context = executionResult.getContext();
-
- timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag());
-
- List executionRoutes = Optional.ofNullable(dagInfoRet.getDagInvokeMsg())
- .map(DAGInvokeMsg::getExecutionRoutes)
- .orElse(new ArrayList<>());
- executionRoutes.stream()
- .max(Comparator.comparingInt(ExecutionInfo::getIndex))
- .filter(executionInfo -> executionInfo.getExecutionType() == FunctionPattern.FLOW_SYNC)
- .ifPresent(executionInfo -> {
- try {
- String parentDAGExecutionId = executionInfo.getExecutionId();
- String taskInfoName = executionInfo.getTaskInfoName();
- TaskStatus taskStatus = calculateSubFlowTaskStatus(dagInfoRet);
- TaskInvokeMsg taskInvokeMsg = Optional.ofNullable(dagInfoRet.getDagInvokeMsg())
- .map(it -> TaskInvokeMsg.builder().code(it.getCode()).msg(it.getMsg()).ext(it.getExt()).build())
- .orElse(null);
- NotifyInfo notifyInfo = NotifyInfo.builder().taskInfoName(taskInfoName).taskStatus(taskStatus).taskInvokeMsg(taskInvokeMsg).build();
- finishTaskSync(parentDAGExecutionId, TaskCategory.FUNCTION.getValue(), notifyInfo, context);
- } catch (Exception e) {
- log.warn("finishDAG fails to finish task, executionInfo:{}", executionInfo, e);
- }
- });
-
- trialClose(executionId, dagStatus, dagInfoRet, context);
+ return supplier.get();
}
private TaskStatus calculateSubFlowTaskStatus(DAGInfo dagInfoRet) {
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java
index 73b16e4db..ffabb2f46 100644
--- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java
@@ -18,6 +18,8 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
+import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
+import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.concurrent.ExecutionRunnable;
import com.weibo.rill.flow.olympicene.core.constant.SystemConfig;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
@@ -26,14 +28,15 @@
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import com.weibo.rill.flow.olympicene.core.model.task.ForeachTask;
-import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
-import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@@ -50,7 +53,7 @@
@Slf4j
public class DAGTraversal {
private final ContextHelper contextHelper = ContextHelper.getInstance();
-
+ private final TracerHelper tracerHelper;
private final DAGContextStorage dagContextStorage;
private final DAGInfoStorage dagInfoStorage;
private final DAGStorageProcedure dagStorageProcedure;
@@ -61,26 +64,35 @@ public class DAGTraversal {
private Stasher stasher;
public DAGTraversal(DAGContextStorage dagStorage, DAGInfoStorage dagInfoStorage, DAGStorageProcedure dagStorageProcedure,
- ExecutorService traversalExecutor) {
+ ExecutorService traversalExecutor, TracerHelper tracerHelper) {
this.dagContextStorage = dagStorage;
this.dagInfoStorage = dagInfoStorage;
this.dagStorageProcedure = dagStorageProcedure;
this.traversalExecutor = traversalExecutor;
+ this.tracerHelper = tracerHelper;
}
public void submitTraversal(String executionId, String completedTaskName) {
+ // 获取 execution context
+ Context executionContext = tracerHelper.loadExecutionContext(executionId);
+ if (executionContext == null) {
+ executionContext = Context.current();
+ }
+ Context finalContext = executionContext; // 为了在 lambda 中使用
+
traversalExecutor.execute(new ExecutionRunnable(executionId,() -> {
try {
log.info("submitTraversal begin lock executionId:{}, completedTaskName:{}", executionId, completedTaskName);
-
- Map params = Maps.newHashMap();
- params.put("executionId", executionId);
- params.put("completedTaskName", completedTaskName);
-
- Runnable basicActions = () -> dagStorageProcedure.lockAndRun(
- LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName));
- Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS);
- DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
+ try (Scope ignored = finalContext.makeCurrent()) { // 在新线程中恢复 context
+ Map params = Maps.newHashMap();
+ params.put("executionId", executionId);
+ params.put("completedTaskName", completedTaskName);
+
+ Runnable basicActions = () -> dagStorageProcedure.lockAndRun(
+ LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName));
+ Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS);
+ DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
+ }
} catch (Exception e) {
log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e);
}
@@ -88,18 +100,27 @@ public void submitTraversal(String executionId, String completedTaskName) {
}
public void submitTasks(String executionId, Set taskInfos, Map groupedContext) {
+ // 获取 execution context
+ Context executionContext = tracerHelper.loadExecutionContext(executionId);
+ if (executionContext == null) {
+ executionContext = Context.current();
+ }
+ Context finalContext = executionContext;
+
traversalExecutor.execute(new ExecutionRunnable(executionId, () -> {
try {
log.info("submitTasks begin get lock executionId:{}", executionId);
- Runnable runnable = () -> dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> {
- log.info("submitTasks begin execute task executionId:{}", executionId);
- Set readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(taskInfos);
- if (CollectionUtils.isNotEmpty(readyToRunTasks)) {
- List>> taskToContexts = contextHelper.getContext(readyToRunTasks, groupedContext);
- runTasks(executionId, taskToContexts);
- }
- });
- DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
+ try (Scope ignored = finalContext.makeCurrent()) { // 在新线程中恢复 context
+ Runnable runnable = () -> dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> {
+ log.info("submitTasks begin execute task executionId:{}", executionId);
+ Set readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(taskInfos);
+ if (CollectionUtils.isNotEmpty(readyToRunTasks)) {
+ List>> taskToContexts = contextHelper.getContext(readyToRunTasks, groupedContext);
+ runTasks(executionId, taskToContexts);
+ }
+ });
+ DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes());
+ }
} catch (Exception e) {
log.error("dag {} traversal exception with tasks {}. ", executionId, Joiner.on(",").join(taskInfos.stream().map(TaskInfo::getName).collect(Collectors.toList())), e);
}
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
index d5f1f8c92..90ff5405d 100644
--- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java
@@ -33,6 +33,7 @@
import com.weibo.rill.flow.olympicene.traversal.helper.DefaultStasher;
import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPath;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping;
@@ -46,15 +47,15 @@ public class OlympiceneFacade {
public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage,
Callback callback, DAGDispatcher dagDispatcher,
DAGStorageProcedure dagStorageProcedure, TimeChecker timeChecker,
- SwitcherManager switcherManager) {
+ SwitcherManager switcherManager, TracerHelper tracerHelper) {
ExecutorService executor = SameThreadExecutorService.INSTANCE;
return build(dagInfoStorage, dagContextStorage, dagStorageProcedure, callback, null,
- dagDispatcher, timeChecker, executor, switcherManager);
+ dagDispatcher, timeChecker, executor, switcherManager, tracerHelper);
}
public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure,
Callback callback, DAGResultHandler dagResultHandler, DAGDispatcher dagDispatcher,
- TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager) {
+ TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager, TracerHelper tracerHelper) {
JSONPathInputOutputMapping jsonPathInputOutputMapping = new JSONPathInputOutputMapping();
DefaultStasher stasher = new DefaultStasher();
@@ -64,8 +65,8 @@ public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage
Map taskRunners = buildTaskRunners(dagInfoStorage, dagContextStorage, dagDispatcher,
jsonPathInputOutputMapping, jsonPathInputOutputMapping, dagStorageProcedure, stasher, switcherManager);
- DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor);
- DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler);
+ DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor, tracerHelper);
+ DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler, tracerHelper);
dagTraversal.setDagOperations(dagOperations);
dagTraversal.setStasher(stasher);
timeCheckRunner.setDagOperations(dagOperations);
diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java
new file mode 100644
index 000000000..dd74ed068
--- /dev/null
+++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java
@@ -0,0 +1,139 @@
+package com.weibo.rill.flow.olympicene.traversal.helper;
+
+import com.alibaba.fastjson.JSONObject;
+import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
+import io.opentelemetry.api.trace.*;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NoArgsConstructor
+public class TracerHelper {
+ private RedisClient redisClient;
+ @Getter
+ private Tracer tracer;
+
+ public TracerHelper(RedisClient redisClient, Tracer tracer) {
+ this.redisClient = redisClient;
+ this.tracer = tracer;
+ }
+
+ // Redis key 前缀
+ private static final String TRACE_KEY_PREFIX = "rill_flow_trace_";
+ // 设置合适的过期时间(例如24小时)
+ private static final int TRACE_EXPIRE_SECONDS = 2 * 60 * 60;
+ private static final String EXECUTION_TRACE_KEY_PREFIX = "rill_flow_execution_trace_";
+
+ public void removeSpanContext(String executionId, String taskId) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ redisClient.del(key.getBytes());
+ } catch (Exception e) {
+ log.error("Failed to remove span context from Redis for task: {}", taskId, e);
+ }
+ }
+
+ public void saveSpan(String executionId, String taskId, Context parentContext, Span currentSpan) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ JSONObject contextInfo = new JSONObject();
+ SpanContext spanContext = currentSpan.getSpanContext();
+ SpanContext parentSpanContext = Span.fromContext(parentContext).getSpanContext();
+
+ contextInfo.put("traceId", spanContext.getTraceId());
+ contextInfo.put("spanId", spanContext.getSpanId());
+ contextInfo.put("parentSpanId", parentSpanContext.getSpanId());
+ contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex());
+ contextInfo.put("startTime", System.currentTimeMillis()); // 保存开始时间
+
+ redisClient.set(key, contextInfo.toJSONString());
+ redisClient.expire(key, TRACE_EXPIRE_SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to save context to Redis for task: {}", taskId, e);
+ }
+ }
+
+ public Span loadSpan(String executionId, String taskId) {
+ try {
+ String key = TRACE_KEY_PREFIX + executionId + "_" + taskId;
+ String contextInfoString = redisClient.get(key);
+
+ if (contextInfoString == null || contextInfoString.isEmpty()) {
+ return null;
+ }
+
+ JSONObject contextInfo = JSONObject.parseObject(contextInfoString);
+ String traceId = contextInfo.getString("traceId");
+ String spanId = contextInfo.getString("spanId");
+ String parentSpanId = contextInfo.getString("parentSpanId");
+ String traceFlags = contextInfo.getString("traceFlags");
+ long startTime = Long.parseLong(contextInfo.getString("startTime"));
+
+ SpanContext parentContext = SpanContext.create(
+ traceId,
+ parentSpanId,
+ TraceFlags.fromHex(traceFlags, 0),
+ TraceState.getDefault()
+ );
+
+ return tracer.spanBuilder("runTask " + taskId)
+ .setParent(Context.current().with(Span.wrap(parentContext)))
+ .setAttribute("original.span.id", spanId)
+ .setStartTimestamp(startTime, java.util.concurrent.TimeUnit.MILLISECONDS) // 设置正确的开始时间
+ .startSpan();
+ } catch (Exception e) {
+ log.error("Failed to load span from Redis for task: {}", taskId, e);
+ return null;
+ } finally {
+ removeSpanContext(executionId, taskId);
+ }
+ }
+
+ public void saveExecutionContext(String executionId, Context context) {
+ try {
+ String key = EXECUTION_TRACE_KEY_PREFIX + executionId;
+ Span span = Span.fromContext(context);
+ SpanContext spanContext = span.getSpanContext();
+ JSONObject contextInfo = new JSONObject();
+ contextInfo.put("traceId", spanContext.getTraceId());
+ contextInfo.put("spanId", spanContext.getSpanId());
+ contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex());
+
+ redisClient.set(key, contextInfo.toJSONString());
+ redisClient.expire(key, TRACE_EXPIRE_SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to save execution context to Redis for execution: {}", executionId, e);
+ }
+ }
+
+ public Context loadExecutionContext(String executionId) {
+ try {
+ String key = EXECUTION_TRACE_KEY_PREFIX + executionId;
+ String contextInfoString = redisClient.get(key);
+
+ if (contextInfoString == null || contextInfoString.isEmpty()) {
+ return null;
+ }
+
+ JSONObject contextInfo = JSONObject.parseObject(contextInfoString);
+ String traceId = contextInfo.getString("traceId");
+ String spanId = contextInfo.getString("spanId");
+ String traceFlags = contextInfo.getString("traceFlags");
+
+ SpanContext spanContext = SpanContext.create(
+ traceId,
+ spanId,
+ TraceFlags.fromHex(traceFlags, 0),
+ TraceState.getDefault()
+ );
+
+ return Context.current().with(Span.wrap(spanContext));
+ } catch (Exception e) {
+ log.error("Failed to load execution context from Redis for execution: {}", executionId, e);
+ return null;
+ }
+ }
+}
diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
index 834832838..fdf265d3f 100644
--- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
+++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java
@@ -266,7 +266,8 @@ public JSONObject getDescriptor(String descriptorId) {
*/
private void generateResourceProtocol(JSONObject task) {
try {
- if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))) {
+ if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))
+ || task.getString("resourceName") == null) {
return;
}
String resourceName = task.getString("resourceName");
diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
index f07398834..3293fc1ad 100644
--- a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
+++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java
@@ -31,6 +31,7 @@
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo;
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher;
import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService;
+import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping;
import com.weibo.rill.flow.service.component.OlympiceneCallback;
import com.weibo.rill.flow.service.component.RuntimeExecutorServiceProxy;
@@ -47,6 +48,7 @@
import com.weibo.rill.flow.service.storage.RuntimeRedisClients;
import com.weibo.rill.flow.service.storage.RuntimeStorage;
import com.weibo.rill.flow.service.util.IpUtils;
+import io.opentelemetry.api.trace.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
@@ -122,6 +124,13 @@ public BusinessTimeChecker timeChecker(
return new BusinessTimeChecker(redisClient);
}
+ @Bean
+ public TracerHelper tracerHelper(
+ @Autowired @Qualifier("runtimeRedisClients") RedisClient redisClient,
+ @Autowired Tracer tracer) {
+ return new TracerHelper(redisClient, tracer);
+ }
+
@Bean(destroyMethod = "shutdown")
public ExecutorService notifyExecutor(@Autowired BizDConfs bizDConfs,
diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java
new file mode 100644
index 000000000..381489007
--- /dev/null
+++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java
@@ -0,0 +1,63 @@
+package com.weibo.rill.flow.configuration;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@Slf4j
+public class OpenTelemetryConfig {
+ @Value("${otel.service.name:rill-flow}")
+ private String serviceName;
+
+ @Value("${otel.exporter.otlp.endpoint:http://jaeger:4317}")
+ private String endpoint;
+
+ @Value("${otel.traces.sampler.probability:1.0}")
+ private double samplerProbability;
+
+ @Bean
+ public OpenTelemetry openTelemetry() {
+ log.info("Initializing OpenTelemetry with endpoint: {}, service name: {}", endpoint, serviceName);
+
+ Resource resource = Resource.getDefault()
+ .merge(Resource.create(Attributes.of(
+ ResourceAttributes.SERVICE_NAME, serviceName,
+ ResourceAttributes.SERVICE_VERSION, "1.0.0"
+ )));
+
+ OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
+ .setEndpoint(endpoint)
+ .build();
+
+ SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
+ .setSampler(Sampler.traceIdRatioBased(samplerProbability))
+ .addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
+ .build())
+ .setResource(resource)
+ .build();
+
+ return OpenTelemetrySdk.builder()
+ .setTracerProvider(sdkTracerProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .buildAndRegisterGlobal();
+ }
+
+ @Bean
+ public Tracer tracer(OpenTelemetry openTelemetry) {
+ return openTelemetry.getTracer("rill-flow", "1.0.0");
+ }
+}
diff --git a/rill-flow-web/src/main/resources/application.properties b/rill-flow-web/src/main/resources/application.properties
index 30ae7c15a..f4c629bb3 100644
--- a/rill-flow-web/src/main/resources/application.properties
+++ b/rill-flow-web/src/main/resources/application.properties
@@ -24,4 +24,8 @@ spring.mvc.pathmatch.matching-strategy=ant_path_matcher
rill.flow.task.template.datasource.jdbc.master.url=jdbc:mysql://rill-flow-mysql:3306/rill_flow?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
rill.flow.task.template.datasource.master.user=root
-rill.flow.task.template.datasource.master.password=secret
\ No newline at end of file
+rill.flow.task.template.datasource.master.password=secret
+
+otel.service.name=rill-flow
+otel.exporter.otlp.endpoint=http://jaeger:4317
+otel.traces.sampler.probability=1.0
\ No newline at end of file