diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index b69ed4365c..1ee9b17608 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -48,6 +48,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils import java.{lang, util} import java.text.MessageFormat +import java.time.Instant +import java.time.format.DateTimeFormatter import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit @@ -331,19 +333,30 @@ abstract class EntranceServer extends Logging { ).toLong undoneTask .filter { job => - val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels) - val jobMetrics = Option(job.jobRequest.getMetrics) - val startTime = - if (jobMetrics.exists(_.containsKey(TaskConstant.JOB_RUNNING_TIME))) { - jobMetrics.get.get(TaskConstant.JOB_RUNNING_TIME).toString.toLong - } else { - 0L - } - engineType.contains( - EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE - ) && startTime != 0 && startTime < diagnosisTime && !diagnosedJobs.containsKey( - job.getJobRequest.getId.toString - ) + try { + val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels) + val jobMetrics = + Option(JobHistoryHelper.getTaskByTaskID(job.getJobRequest.getId).getMetrics) + val startTime = + if (jobMetrics.exists(_.containsKey(TaskConstant.JOB_RUNNING_TIME))) { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ") + val instant = Instant.from( + formatter.parse(jobMetrics.get.get(TaskConstant.JOB_RUNNING_TIME).toString) + ) + instant.toEpochMilli + } else { + 0L + } + engineType.contains( + EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE + ) && startTime != 0 && startTime < diagnosisTime && !diagnosedJobs.containsKey( + job.getJobRequest.getId.toString + ) + } catch { + case t: Throwable => + logger.error(s"Failed to check task for diagnosis, reason: ${t.getMessage}", t) + false + } } .foreach { job => val jobId = job.getJobRequest.getId diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala index e7f69d3c84..c38eaf52ce 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala @@ -266,7 +266,7 @@ object JobHistoryHelper extends Logging { tasks } - private def getTaskByTaskID(taskID: Long): JobRequest = { + def getTaskByTaskID(taskID: Long): JobRequest = { val jobRequest = new JobRequest jobRequest.setId(taskID) jobRequest.setSource(null)