diff --git a/pom.xml b/pom.xml index c573d98ff..737690625 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ https://sonarcloud.io 4.12.0 0.8.11 + 1.29.0 @@ -506,42 +507,42 @@ io.opentelemetry.javaagent opentelemetry-javaagent - 1.29.0 + ${opentelemetry.version} io.opentelemetry.instrumentation opentelemetry-instrumentation-annotations - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-api - 1.29.0 + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.version} io.opentelemetry opentelemetry-exporter-otlp - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-sdk - 1.29.0 + ${opentelemetry.version} io.opentelemetry opentelemetry-semconv 1.29.0-alpha - - io.opentelemetry - opentelemetry-sdk-extension-autoconfigure - 1.30.1 - io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi - 1.29.0 + ${opentelemetry.version} diff --git a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java index a6391dc31..96781cf8b 100644 --- a/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java +++ b/rill-flow-dag/olympicene-spring-boot-starter/src/main/java/com/weibo/rill/flow/olympicene/spring/boot/OlympiceneAutoConfiguration.java @@ -146,9 +146,10 @@ public DAGTraversal dagTraversal( @Autowired @Qualifier("dagContextStorage") DAGContextStorage dagContextStorage, @Autowired @Qualifier("dagInfoStorage") DAGInfoStorage dagInfoStorage, @Autowired @Qualifier("dagStorageProcedure") DAGStorageProcedure dagStorageProcedure, - @Autowired @Qualifier("traversalExecutor") ExecutorService traversalExecutor) { + @Autowired @Qualifier("traversalExecutor") ExecutorService traversalExecutor, + @Autowired TracerHelper tracerHelper) { log.info("begin to init default DAGTraversal bean"); - DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, traversalExecutor); + DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, traversalExecutor, tracerHelper); dagTraversal.setStasher(stasher); return dagTraversal; } @@ -299,10 +300,11 @@ public DAGOperations dagOperations( @Autowired @Qualifier("dagCallback") Callback dagCallback, @Autowired @Qualifier("timeCheckRunner") TimeCheckRunner timeCheckRunner, @Autowired @Qualifier("runnerExecutor") ExecutorService runnerExecutor, - @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler) { + @Autowired(required = false) @Qualifier("dagResultHandler") DAGResultHandler dagResultHandler, + @Autowired @Qualifier("tracerHelper") TracerHelper tracerHelper) { log.info("begin to init default DAGOperations bean"); DAGOperations dagOperations = new DAGOperations(runnerExecutor, taskRunners, dagRunner, - timeCheckRunner, dagTraversal, dagCallback, dagResultHandler); + timeCheckRunner, dagTraversal, dagCallback, dagResultHandler, tracerHelper); dagTraversal.setDagOperations(dagOperations); timeCheckRunner.setDagOperations(dagOperations); return dagOperations; diff --git a/rill-flow-dag/olympicene-traversal/pom.xml b/rill-flow-dag/olympicene-traversal/pom.xml index c13f365bf..eae63ac02 100644 --- a/rill-flow-dag/olympicene-traversal/pom.xml +++ b/rill-flow-dag/olympicene-traversal/pom.xml @@ -81,6 +81,10 @@ io.opentelemetry opentelemetry-sdk-extension-autoconfigure-spi + + com.weibo + rill-flow-common + diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index 81c43f237..897e5373b 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.weibo.rill.flow.common.constant.ReservedConstant; import com.weibo.rill.flow.interfaces.model.mapping.Mapping; import com.weibo.rill.flow.interfaces.model.strategy.Timeline; import com.weibo.rill.flow.interfaces.model.task.*; @@ -37,9 +38,13 @@ import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode; import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException; import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner; import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner; import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -61,6 +66,7 @@ public class DAGOperations { private final DAGTraversal dagTraversal; private final Callback callback; private final DAGResultHandler dagResultHandler; + private final TracerHelper tracerHelper; public static final BiConsumer OPERATE_WITH_RETRY = (operation, retryTimes) -> { @@ -79,7 +85,7 @@ public class DAGOperations { public DAGOperations(ExecutorService runnerExecutor, Map taskRunners, DAGRunner dagRunner, TimeCheckRunner timeCheckRunner, DAGTraversal dagTraversal, Callback callback, - DAGResultHandler dagResultHandler) { + DAGResultHandler dagResultHandler, TracerHelper tracerHelper) { this.runnerExecutor = runnerExecutor; this.taskRunners = taskRunners; this.dagRunner = dagRunner; @@ -87,15 +93,20 @@ public DAGOperations(ExecutorService runnerExecutor, Map tas this.dagTraversal = dagTraversal; this.callback = callback; this.dagResultHandler = dagResultHandler; + this.tracerHelper = tracerHelper; } public void runTasks(String executionId, Collection>> taskInfoToContexts) { log.info("runTasks begin submit task executionId:{}", executionId); + Context parentContext = Context.current(); // 捕获当前的 context + taskInfoToContexts.forEach(taskInfoToContext -> runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { TaskInfo taskInfo = taskInfoToContext.getLeft(); try { log.info("runTasks task begin to execute executionId:{} taskInfoName:{}", executionId, taskInfo.getName()); - runTask(executionId, taskInfo, taskInfoToContext.getRight()); + try (Scope ignored = parentContext.makeCurrent()) { // 在新线程中恢复 context + runTask(executionId, taskInfo, taskInfoToContext.getRight()); + } } catch (Exception e) { log.error("runTasks fails, executionId:{}, taskName:{}", executionId, taskInfo.getName(), e); } @@ -104,49 +115,73 @@ public void runTasks(String executionId, Collection context) { - Map params = Maps.newHashMap(); - params.put(EXECUTION_ID, executionId); - params.put("taskInfo", taskInfo); - params.put("context", context); - - TaskRunner runner = selectRunner(taskInfo); - Supplier basicActions = () -> runner.run(executionId, taskInfo, context); - - Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS); - ExecutionResult executionResult = supplier.get(); - - /* - 任务执行后结果类型 - 1. 任务执行完成 如 return/pass - 1.1 任务执行完成 需要寻找下一个能执行的任务 - 1.2 任务执行中 需要外部系统调finish触发下一个任务执行 - 1.3 任务需要重试 - 2. 流程控制类任务 foreach/choice - 2.1 触发能够执行的子任务 - */ - // 对应1.1 - if (isTaskCompleted(executionResult)) { - dagTraversal.submitTraversal(executionId, taskInfo.getName()); - invokeTaskCallback(executionId, taskInfo, context); + // 恢复 execution context + Context executionContext = tracerHelper.loadExecutionContext(executionId); + if (executionContext == null) { + executionContext = Context.current(); } - // 对应1.2 - if (executionResult.getTaskStatus() == TaskStatus.RUNNING) { - Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null); - Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline)) - .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds)); - } - // 对应1.3 - if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { - runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), - executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); - } - // 对应2.1 - if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) { - executionResult.getSubTaskInfosAndContext() - .forEach(subTaskInfosAndContext -> { - dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight()); - safeSleep(10); - }); + + Span span = tracerHelper.getTracer().spanBuilder("runTask " + taskInfo.getName()) + .setAttribute("execution.id", executionId) + .setAttribute("task.name", taskInfo.getName()) + .setAttribute("task.category", taskInfo.getTask().getCategory()) + .setParent(executionContext) + .startSpan(); + + TaskStatus executionResultStatus = null; + try (Scope ignored = Context.current().with(span).makeCurrent()) { + TaskRunner runner = selectRunner(taskInfo); + Supplier basicActions = () -> runner.run(executionId, taskInfo, context); + + Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, Maps.newHashMap(), SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS); + ExecutionResult executionResult = supplier.get(); + + /* + 任务执行后结果类型 + 1. 任务执行完成 如 return/pass + 1.1 任务执行完成 需要寻找下一个能执行的任务 + 1.2 任务执行中 需要外部系统调finish触发下一个任务执行 + 1.3 任务需要重试 + 2. 流程控制类任务 foreach/choice + 2.1 触发能够执行的子任务 + */ + // 对应1.1 + if (isTaskCompleted(executionResult)) { + Context currentContext = Context.current(); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignore = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, taskInfo.getName()); + invokeTaskCallback(executionId, taskInfo, context); + } + })); + } + executionResultStatus = executionResult.getTaskStatus(); + // 对应1.2 + if (executionResult.getTaskStatus() == TaskStatus.RUNNING) { + Timeline timeline = Optional.ofNullable(taskInfo.getTask()).map(BaseTask::getTimeline).orElse(null); + Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap<>(), timeline)) + .ifPresent(timeoutSeconds -> timeCheckRunner.addTaskToTimeoutCheck(executionId, taskInfo, timeoutSeconds)); + tracerHelper.saveSpan(executionId, taskInfo.getName(), executionContext, span); + return; + } + // 对应1.3 + if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { + runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), + executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); + } + // 对应2.1 + if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) { + executionResult.getSubTaskInfosAndContext() + .forEach(subTaskInfosAndContext -> { + dagTraversal.submitTasks(executionId, subTaskInfosAndContext.getLeft(), subTaskInfosAndContext.getRight()); + safeSleep(10); + }); + } + } finally { + if (executionResultStatus != null && executionResultStatus.isCompleted()) { + span.setAttribute("status", executionResultStatus.toString()); + span.end(); + } } } @@ -204,24 +239,51 @@ public void finishTaskSync(String executionId, String taskCategory, NotifyInfo n Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS); ExecutionResult executionResult = supplier.get(); - if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { - timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); - runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), - executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); - } - if (isTaskCompleted(executionResult)) { - timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); - dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); - invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); - } - if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) { - dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal()); - } + // 尝试恢复之前的 span + Span span = tracerHelper.loadSpan(executionId, executionResult.getTaskInfo().getName()); + Context currentContext = span != null ? Context.current().with(span) : Context.current(); + + try (Scope scope = currentContext.makeCurrent()) { + if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) { + timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + runTaskWithTimeInterval(executionId, executionResult.getTaskInfo(), + executionResult.getContext(), executionResult.getRetryIntervalInSeconds()); + } + })); + } + if (isTaskCompleted(executionResult)) { + timeCheckRunner.remTaskFromTimeoutCheck(executionId, executionResult.getTaskInfo()); + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + invokeTaskCallback(executionId, executionResult.getTaskInfo(), executionResult.getContext()); + } + })); + } + if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) { + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskNameNeedToTraversal()); + } + })); + } - // key finished - if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex()) - || isSubFlowTaskKeyCompleted(executionResult)) { - dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + // key finished + if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex()) + || isSubFlowTaskKeyCompleted(executionResult)) { + runnerExecutor.execute(new ExecutionRunnable(executionId, () -> { + try (Scope ignored = currentContext.makeCurrent()) { + dagTraversal.submitTraversal(executionId, executionResult.getTaskInfo().getName()); + } + })); + } + } finally { + if (span != null && executionResult.getTaskStatus().isCompleted()) { + span.setAttribute("status", executionResult.getTaskStatus().toString()); + span.end(); + } } } @@ -232,15 +294,84 @@ public void redoTask(String executionId, List taskNames, Map data, NotifyInfo notifyInfo) { - log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo); - ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo); - Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline())) - .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds)); - dagTraversal.submitTraversal(executionId, null); + String[] executionIdInfos = executionId.split(ReservedConstant.EXECUTION_ID_CONNECTOR); + String descriptorId = executionIdInfos[0]; + + // 创建 DAG 的根 span + Span dagSpan = tracerHelper.getTracer().spanBuilder("submitDAG " + descriptorId) + .setAttribute("execution.id", executionId) + .setAttribute("dag.name", dag.getDagName()) + .setParent(Context.current()) // 显式设置父 context + .startSpan(); + + Context dagContext = Context.current().with(dagSpan); + try (Scope ignored = dagContext.makeCurrent()) { + // 保存 execution context + tracerHelper.saveExecutionContext(executionId, dagContext); + + log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", executionId, notifyInfo); + ExecutionResult executionResult = dagRunner.submitDAG(executionId, dag, settings, data, notifyInfo); + Optional.ofNullable(getTimeoutSeconds(new HashMap<>(), executionResult.getContext(), dag.getTimeline())) + .ifPresent(timeoutSeconds -> timeCheckRunner.addDAGToTimeoutCheck(executionId, timeoutSeconds)); + dagTraversal.submitTraversal(executionId, null); + } } public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) { log.info("finishDAG task begin to execute executionId:{} dagStatus:{}", executionId, dagStatus); + + // 恢复 execution context 以获取 DAG span + Context executionContext = tracerHelper.loadExecutionContext(executionId); + if (executionContext != null) { + Span dagSpan = Span.fromContext(executionContext); + try (Scope ignored = executionContext.makeCurrent()) { + ExecutionResult executionResult = finishDAGInternal(executionId, dagInfo, dagStatus, dagInvokeMsg); + + DAGInfo dagInfoRet = executionResult.getDagInfo(); + Map context = executionResult.getContext(); + + timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag()); + + List executionRoutes = Optional.ofNullable(dagInfoRet.getDagInvokeMsg()) + .map(DAGInvokeMsg::getExecutionRoutes) + .orElse(new ArrayList<>()); + executionRoutes.stream() + .max(Comparator.comparingInt(ExecutionInfo::getIndex)) + .filter(executionInfo -> executionInfo.getExecutionType() == FunctionPattern.FLOW_SYNC) + .ifPresent(executionInfo -> { + try { + String parentDAGExecutionId = executionInfo.getExecutionId(); + String taskInfoName = executionInfo.getTaskInfoName(); + TaskStatus taskStatus = calculateSubFlowTaskStatus(dagInfoRet); + TaskInvokeMsg taskInvokeMsg = Optional.ofNullable(dagInfoRet.getDagInvokeMsg()) + .map(it -> TaskInvokeMsg.builder().code(it.getCode()).msg(it.getMsg()).ext(it.getExt()).build()) + .orElse(null); + NotifyInfo notifyInfo = NotifyInfo.builder().taskInfoName(taskInfoName).taskStatus(taskStatus).taskInvokeMsg(taskInvokeMsg).build(); + finishTaskSync(parentDAGExecutionId, TaskCategory.FUNCTION.getValue(), notifyInfo, context); + } catch (Exception e) { + log.warn("finishDAG fails to finish task, executionInfo:{}", executionInfo, e); + } + }); + + trialClose(executionId, dagStatus, dagInfoRet, context); + + // 在 DAG 完成时结束 span + dagSpan.setAttribute("status", dagStatus.name()); + dagSpan.end(); + } + } else { + ExecutionResult executionResult = finishDAGInternal(executionId, dagInfo, dagStatus, dagInvokeMsg); + + DAGInfo dagInfoRet = executionResult.getDagInfo(); + Map context = executionResult.getContext(); + + timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag()); + + trialClose(executionId, dagStatus, dagInfoRet, context); + } + } + + private ExecutionResult finishDAGInternal(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg) { Map params = Maps.newHashMap(); params.put(EXECUTION_ID, executionId); params.put("dagInfo", dagInfo); @@ -249,35 +380,7 @@ public void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, Supplier basicActions = () -> dagRunner.finishDAG(executionId, dagInfo, dagStatus, dagInvokeMsg); Supplier supplier = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.DAG_FINISH_CUSTOMIZED_PLUGINS); - ExecutionResult executionResult = supplier.get(); - - DAGInfo dagInfoRet = executionResult.getDagInfo(); - Map context = executionResult.getContext(); - - timeCheckRunner.remDAGFromTimeoutCheck(executionId, dagInfoRet.getDag()); - - List executionRoutes = Optional.ofNullable(dagInfoRet.getDagInvokeMsg()) - .map(DAGInvokeMsg::getExecutionRoutes) - .orElse(new ArrayList<>()); - executionRoutes.stream() - .max(Comparator.comparingInt(ExecutionInfo::getIndex)) - .filter(executionInfo -> executionInfo.getExecutionType() == FunctionPattern.FLOW_SYNC) - .ifPresent(executionInfo -> { - try { - String parentDAGExecutionId = executionInfo.getExecutionId(); - String taskInfoName = executionInfo.getTaskInfoName(); - TaskStatus taskStatus = calculateSubFlowTaskStatus(dagInfoRet); - TaskInvokeMsg taskInvokeMsg = Optional.ofNullable(dagInfoRet.getDagInvokeMsg()) - .map(it -> TaskInvokeMsg.builder().code(it.getCode()).msg(it.getMsg()).ext(it.getExt()).build()) - .orElse(null); - NotifyInfo notifyInfo = NotifyInfo.builder().taskInfoName(taskInfoName).taskStatus(taskStatus).taskInvokeMsg(taskInvokeMsg).build(); - finishTaskSync(parentDAGExecutionId, TaskCategory.FUNCTION.getValue(), notifyInfo, context); - } catch (Exception e) { - log.warn("finishDAG fails to finish task, executionInfo:{}", executionInfo, e); - } - }); - - trialClose(executionId, dagStatus, dagInfoRet, context); + return supplier.get(); } private TaskStatus calculateSubFlowTaskStatus(DAGInfo dagInfoRet) { diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java index 73b16e4db..ffabb2f46 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java @@ -18,6 +18,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.Maps; +import com.weibo.rill.flow.interfaces.model.task.TaskInfo; +import com.weibo.rill.flow.interfaces.model.task.TaskStatus; import com.weibo.rill.flow.olympicene.core.concurrent.ExecutionRunnable; import com.weibo.rill.flow.olympicene.core.constant.SystemConfig; import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper; @@ -26,14 +28,15 @@ import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo; import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus; import com.weibo.rill.flow.olympicene.core.model.task.ForeachTask; -import com.weibo.rill.flow.interfaces.model.task.TaskInfo; -import com.weibo.rill.flow.interfaces.model.task.TaskStatus; import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage; import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage; import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure; import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper; import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper; import com.weibo.rill.flow.olympicene.traversal.helper.Stasher; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -50,7 +53,7 @@ @Slf4j public class DAGTraversal { private final ContextHelper contextHelper = ContextHelper.getInstance(); - + private final TracerHelper tracerHelper; private final DAGContextStorage dagContextStorage; private final DAGInfoStorage dagInfoStorage; private final DAGStorageProcedure dagStorageProcedure; @@ -61,26 +64,35 @@ public class DAGTraversal { private Stasher stasher; public DAGTraversal(DAGContextStorage dagStorage, DAGInfoStorage dagInfoStorage, DAGStorageProcedure dagStorageProcedure, - ExecutorService traversalExecutor) { + ExecutorService traversalExecutor, TracerHelper tracerHelper) { this.dagContextStorage = dagStorage; this.dagInfoStorage = dagInfoStorage; this.dagStorageProcedure = dagStorageProcedure; this.traversalExecutor = traversalExecutor; + this.tracerHelper = tracerHelper; } public void submitTraversal(String executionId, String completedTaskName) { + // 获取 execution context + Context executionContext = tracerHelper.loadExecutionContext(executionId); + if (executionContext == null) { + executionContext = Context.current(); + } + Context finalContext = executionContext; // 为了在 lambda 中使用 + traversalExecutor.execute(new ExecutionRunnable(executionId,() -> { try { log.info("submitTraversal begin lock executionId:{}, completedTaskName:{}", executionId, completedTaskName); - - Map params = Maps.newHashMap(); - params.put("executionId", executionId); - params.put("completedTaskName", completedTaskName); - - Runnable basicActions = () -> dagStorageProcedure.lockAndRun( - LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName)); - Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS); - DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + try (Scope ignored = finalContext.makeCurrent()) { // 在新线程中恢复 context + Map params = Maps.newHashMap(); + params.put("executionId", executionId); + params.put("completedTaskName", completedTaskName); + + Runnable basicActions = () -> dagStorageProcedure.lockAndRun( + LockerKey.buildDagInfoLockName(executionId), () -> doTraversal(executionId, completedTaskName)); + Runnable runnable = PluginHelper.pluginInvokeChain(basicActions, params, SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS); + DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + } } catch (Exception e) { log.error("executionId:{} traversal exception with completedTaskName:{}. ", executionId, completedTaskName, e); } @@ -88,18 +100,27 @@ public void submitTraversal(String executionId, String completedTaskName) { } public void submitTasks(String executionId, Set taskInfos, Map groupedContext) { + // 获取 execution context + Context executionContext = tracerHelper.loadExecutionContext(executionId); + if (executionContext == null) { + executionContext = Context.current(); + } + Context finalContext = executionContext; + traversalExecutor.execute(new ExecutionRunnable(executionId, () -> { try { log.info("submitTasks begin get lock executionId:{}", executionId); - Runnable runnable = () -> dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> { - log.info("submitTasks begin execute task executionId:{}", executionId); - Set readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(taskInfos); - if (CollectionUtils.isNotEmpty(readyToRunTasks)) { - List>> taskToContexts = contextHelper.getContext(readyToRunTasks, groupedContext); - runTasks(executionId, taskToContexts); - } - }); - DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + try (Scope ignored = finalContext.makeCurrent()) { // 在新线程中恢复 context + Runnable runnable = () -> dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(executionId), () -> { + log.info("submitTasks begin execute task executionId:{}", executionId); + Set readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(taskInfos); + if (CollectionUtils.isNotEmpty(readyToRunTasks)) { + List>> taskToContexts = contextHelper.getContext(readyToRunTasks, groupedContext); + runTasks(executionId, taskToContexts); + } + }); + DAGOperations.OPERATE_WITH_RETRY.accept(runnable, SystemConfig.getTraversalRetryTimes()); + } } catch (Exception e) { log.error("dag {} traversal exception with tasks {}. ", executionId, Joiner.on(",").join(taskInfos.stream().map(TaskInfo::getName).collect(Collectors.toList())), e); } diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java index d5f1f8c92..90ff5405d 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/config/OlympiceneFacade.java @@ -33,6 +33,7 @@ import com.weibo.rill.flow.olympicene.traversal.helper.DefaultStasher; import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService; import com.weibo.rill.flow.olympicene.traversal.helper.Stasher; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPath; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping; @@ -46,15 +47,15 @@ public class OlympiceneFacade { public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, Callback callback, DAGDispatcher dagDispatcher, DAGStorageProcedure dagStorageProcedure, TimeChecker timeChecker, - SwitcherManager switcherManager) { + SwitcherManager switcherManager, TracerHelper tracerHelper) { ExecutorService executor = SameThreadExecutorService.INSTANCE; return build(dagInfoStorage, dagContextStorage, dagStorageProcedure, callback, null, - dagDispatcher, timeChecker, executor, switcherManager); + dagDispatcher, timeChecker, executor, switcherManager, tracerHelper); } public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure, Callback callback, DAGResultHandler dagResultHandler, DAGDispatcher dagDispatcher, - TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager) { + TimeChecker timeChecker, ExecutorService executor, SwitcherManager switcherManager, TracerHelper tracerHelper) { JSONPathInputOutputMapping jsonPathInputOutputMapping = new JSONPathInputOutputMapping(); DefaultStasher stasher = new DefaultStasher(); @@ -64,8 +65,8 @@ public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage Map taskRunners = buildTaskRunners(dagInfoStorage, dagContextStorage, dagDispatcher, jsonPathInputOutputMapping, jsonPathInputOutputMapping, dagStorageProcedure, stasher, switcherManager); - DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor); - DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler); + DAGTraversal dagTraversal = new DAGTraversal(dagContextStorage, dagInfoStorage, dagStorageProcedure, executor, tracerHelper); + DAGOperations dagOperations = new DAGOperations(executor, taskRunners, dagRunner, timeCheckRunner, dagTraversal, callback, dagResultHandler, tracerHelper); dagTraversal.setDagOperations(dagOperations); dagTraversal.setStasher(stasher); timeCheckRunner.setDagOperations(dagOperations); diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java new file mode 100644 index 000000000..dd74ed068 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/helper/TracerHelper.java @@ -0,0 +1,139 @@ +package com.weibo.rill.flow.olympicene.traversal.helper; + +import com.alibaba.fastjson.JSONObject; +import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient; +import io.opentelemetry.api.trace.*; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@NoArgsConstructor +public class TracerHelper { + private RedisClient redisClient; + @Getter + private Tracer tracer; + + public TracerHelper(RedisClient redisClient, Tracer tracer) { + this.redisClient = redisClient; + this.tracer = tracer; + } + + // Redis key 前缀 + private static final String TRACE_KEY_PREFIX = "rill_flow_trace_"; + // 设置合适的过期时间(例如24小时) + private static final int TRACE_EXPIRE_SECONDS = 2 * 60 * 60; + private static final String EXECUTION_TRACE_KEY_PREFIX = "rill_flow_execution_trace_"; + + public void removeSpanContext(String executionId, String taskId) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + redisClient.del(key.getBytes()); + } catch (Exception e) { + log.error("Failed to remove span context from Redis for task: {}", taskId, e); + } + } + + public void saveSpan(String executionId, String taskId, Context parentContext, Span currentSpan) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + JSONObject contextInfo = new JSONObject(); + SpanContext spanContext = currentSpan.getSpanContext(); + SpanContext parentSpanContext = Span.fromContext(parentContext).getSpanContext(); + + contextInfo.put("traceId", spanContext.getTraceId()); + contextInfo.put("spanId", spanContext.getSpanId()); + contextInfo.put("parentSpanId", parentSpanContext.getSpanId()); + contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex()); + contextInfo.put("startTime", System.currentTimeMillis()); // 保存开始时间 + + redisClient.set(key, contextInfo.toJSONString()); + redisClient.expire(key, TRACE_EXPIRE_SECONDS); + } catch (Exception e) { + log.error("Failed to save context to Redis for task: {}", taskId, e); + } + } + + public Span loadSpan(String executionId, String taskId) { + try { + String key = TRACE_KEY_PREFIX + executionId + "_" + taskId; + String contextInfoString = redisClient.get(key); + + if (contextInfoString == null || contextInfoString.isEmpty()) { + return null; + } + + JSONObject contextInfo = JSONObject.parseObject(contextInfoString); + String traceId = contextInfo.getString("traceId"); + String spanId = contextInfo.getString("spanId"); + String parentSpanId = contextInfo.getString("parentSpanId"); + String traceFlags = contextInfo.getString("traceFlags"); + long startTime = Long.parseLong(contextInfo.getString("startTime")); + + SpanContext parentContext = SpanContext.create( + traceId, + parentSpanId, + TraceFlags.fromHex(traceFlags, 0), + TraceState.getDefault() + ); + + return tracer.spanBuilder("runTask " + taskId) + .setParent(Context.current().with(Span.wrap(parentContext))) + .setAttribute("original.span.id", spanId) + .setStartTimestamp(startTime, java.util.concurrent.TimeUnit.MILLISECONDS) // 设置正确的开始时间 + .startSpan(); + } catch (Exception e) { + log.error("Failed to load span from Redis for task: {}", taskId, e); + return null; + } finally { + removeSpanContext(executionId, taskId); + } + } + + public void saveExecutionContext(String executionId, Context context) { + try { + String key = EXECUTION_TRACE_KEY_PREFIX + executionId; + Span span = Span.fromContext(context); + SpanContext spanContext = span.getSpanContext(); + JSONObject contextInfo = new JSONObject(); + contextInfo.put("traceId", spanContext.getTraceId()); + contextInfo.put("spanId", spanContext.getSpanId()); + contextInfo.put("traceFlags", spanContext.getTraceFlags().asHex()); + + redisClient.set(key, contextInfo.toJSONString()); + redisClient.expire(key, TRACE_EXPIRE_SECONDS); + } catch (Exception e) { + log.error("Failed to save execution context to Redis for execution: {}", executionId, e); + } + } + + public Context loadExecutionContext(String executionId) { + try { + String key = EXECUTION_TRACE_KEY_PREFIX + executionId; + String contextInfoString = redisClient.get(key); + + if (contextInfoString == null || contextInfoString.isEmpty()) { + return null; + } + + JSONObject contextInfo = JSONObject.parseObject(contextInfoString); + String traceId = contextInfo.getString("traceId"); + String spanId = contextInfo.getString("spanId"); + String traceFlags = contextInfo.getString("traceFlags"); + + SpanContext spanContext = SpanContext.create( + traceId, + spanId, + TraceFlags.fromHex(traceFlags, 0), + TraceState.getDefault() + ); + + return Context.current().with(Span.wrap(spanContext)); + } catch (Exception e) { + log.error("Failed to load execution context from Redis for execution: {}", executionId, e); + return null; + } + } +} diff --git a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java index 834832838..fdf265d3f 100644 --- a/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java +++ b/rill-flow-service/src/main/java/com/weibo/rill/flow/service/facade/DAGDescriptorFacade.java @@ -266,7 +266,8 @@ public JSONObject getDescriptor(String descriptorId) { */ private void generateResourceProtocol(JSONObject task) { try { - if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL))) { + if (task == null || StringUtils.isNotEmpty(task.getString(RESOURCE_PROTOCOL)) + || task.getString("resourceName") == null) { return; } String resourceName = task.getString("resourceName"); diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java index f07398834..3293fc1ad 100644 --- a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java +++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OlympiceneConfiguration.java @@ -31,6 +31,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo; import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher; import com.weibo.rill.flow.olympicene.traversal.helper.SameThreadExecutorService; +import com.weibo.rill.flow.olympicene.traversal.helper.TracerHelper; import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping; import com.weibo.rill.flow.service.component.OlympiceneCallback; import com.weibo.rill.flow.service.component.RuntimeExecutorServiceProxy; @@ -47,6 +48,7 @@ import com.weibo.rill.flow.service.storage.RuntimeRedisClients; import com.weibo.rill.flow.service.storage.RuntimeStorage; import com.weibo.rill.flow.service.util.IpUtils; +import io.opentelemetry.api.trace.Tracer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfigureOrder; @@ -122,6 +124,13 @@ public BusinessTimeChecker timeChecker( return new BusinessTimeChecker(redisClient); } + @Bean + public TracerHelper tracerHelper( + @Autowired @Qualifier("runtimeRedisClients") RedisClient redisClient, + @Autowired Tracer tracer) { + return new TracerHelper(redisClient, tracer); + } + @Bean(destroyMethod = "shutdown") public ExecutorService notifyExecutor(@Autowired BizDConfs bizDConfs, diff --git a/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java new file mode 100644 index 000000000..381489007 --- /dev/null +++ b/rill-flow-web/src/main/java/com/weibo/rill/flow/configuration/OpenTelemetryConfig.java @@ -0,0 +1,63 @@ +package com.weibo.rill.flow.configuration; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class OpenTelemetryConfig { + @Value("${otel.service.name:rill-flow}") + private String serviceName; + + @Value("${otel.exporter.otlp.endpoint:http://jaeger:4317}") + private String endpoint; + + @Value("${otel.traces.sampler.probability:1.0}") + private double samplerProbability; + + @Bean + public OpenTelemetry openTelemetry() { + log.info("Initializing OpenTelemetry with endpoint: {}, service name: {}", endpoint, serviceName); + + Resource resource = Resource.getDefault() + .merge(Resource.create(Attributes.of( + ResourceAttributes.SERVICE_NAME, serviceName, + ResourceAttributes.SERVICE_VERSION, "1.0.0" + ))); + + OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint(endpoint) + .build(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(Sampler.traceIdRatioBased(samplerProbability)) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter) + .build()) + .setResource(resource) + .build(); + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + } + + @Bean + public Tracer tracer(OpenTelemetry openTelemetry) { + return openTelemetry.getTracer("rill-flow", "1.0.0"); + } +} diff --git a/rill-flow-web/src/main/resources/application.properties b/rill-flow-web/src/main/resources/application.properties index 30ae7c15a..f4c629bb3 100644 --- a/rill-flow-web/src/main/resources/application.properties +++ b/rill-flow-web/src/main/resources/application.properties @@ -24,4 +24,8 @@ spring.mvc.pathmatch.matching-strategy=ant_path_matcher rill.flow.task.template.datasource.jdbc.master.url=jdbc:mysql://rill-flow-mysql:3306/rill_flow?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8 rill.flow.task.template.datasource.master.user=root -rill.flow.task.template.datasource.master.password=secret \ No newline at end of file +rill.flow.task.template.datasource.master.password=secret + +otel.service.name=rill-flow +otel.exporter.otlp.endpoint=http://jaeger:4317 +otel.traces.sampler.probability=1.0 \ No newline at end of file