diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java index 9ee21395b25..7dd5cdbf73d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import java.util.Collections; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.DagTask; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExceptionUtils; import org.apache.gobblin.util.ExecutorsUtils; @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService { public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis"; @Getter static long defaultJobStartDeadlineTimeMillis; public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name(); + // Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined + public static List> retryableExceptions = Collections.EMPTY_LIST; @Inject public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory, @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) { defaultJobStartDeadlineTimeMillis = deadlineTimeMs; } + public static boolean isTransientException(Exception e) { + return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions); + } + @Override protected void startUp() { Integer numThreads = ConfigUtils.getInt @@ -149,6 +158,13 @@ public void run() { dagTask.conclude(); log.info(dagProc.contextualizeStatus("concluded dagTask")); } catch (Exception e) { + if(!DagProcessingEngine.isTransientException(e)){ + log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ", + dagTask, e); + dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); + dagTask.conclude(); + } + // Todo add the else block for transient exceptions and add conclude task only if retry limit is not breached log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e); dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java index 6c9694c80f7..252861830e1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java @@ -33,7 +33,6 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.util.ExceptionUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.orchestration.DagActionStore; @@ -92,20 +91,9 @@ public final void process(DagManagementStateStore dagManagementStateStore, dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false); throw e; } - try { logContextualizedInfo("ready to process"); act(dagManagementStateStore, state, dagProcEngineMetrics); logContextualizedInfo("processed"); - } catch (Exception e) { - if (isNonTransientException(e)) { - log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ", - getDagTask(), e); - dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); - dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); - } else { - throw e; - } - } } protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException; @@ -126,8 +114,4 @@ public String contextualizeStatus(String message) { public void logContextualizedInfo(String message) { log.info(contextualizeStatus(message)); } - - protected boolean isNonTransientException(Exception e) { - return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions); - } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 346037f6d2f..adc8cc52844 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -74,7 +74,6 @@ public class DagProcUtils { public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag dag, Dag.DagId dagId) throws IOException { Set> nextNodes = DagUtils.getNext(dag); - if (nextNodes.size() == 1) { Dag.DagNode dagNode = nextNodes.iterator().next(); DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId); @@ -98,6 +97,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat Dag.DagId dagId) { DagUtils.incrementJobAttempt(dagNode); JobExecutionPlan jobExecutionPlan = DagUtils.getJobExecutionPlan(dagNode); + JobSpec jobSpec = DagUtils.getJobSpec(dagNode); Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); @@ -139,12 +139,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); - String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; - log.error(message, e); - jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); - if (jobFailedTimer != null) { - jobFailedTimer.stop(jobMetadata); + // Only mark the job as failed in case of non transient exceptions + if(!DagProcessingEngine.isTransientException(e)){ + TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); + String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; + log.error(message, e); + jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); + if (jobFailedTimer != null) { + jobFailedTimer.stop(jobMetadata); + } } try { // when there is no exception, quota will be released in job status monitor or re-evaluate dag proc diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java index 8fea5303aeb..929bfdb1ed9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java @@ -45,9 +45,8 @@ import org.apache.gobblin.testing.AssertWithBackoff; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; + @Slf4j public class DagProcessingEngineTest { @@ -196,9 +195,15 @@ public void dagProcessingTest() 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. " + "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(), log, 1, 1000L); - + // Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions); - Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions); + Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions); + } + + @Test + public void isNonTransientExceptionTest(){ + Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!"))); + Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!"))); } private enum ExceptionType { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java new file mode 100644 index 00000000000..5fdd5f81ab2 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java @@ -0,0 +1,124 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(EventSubmitter.class) +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeClass + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List> dagNodeList = new ArrayList<>(); + List jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + @Test + public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException { + Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + @Test(expectedExceptions = RuntimeException.class) + public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{ + Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + SpecProducer mockedSpecProducer = mockSpecExecutor.getProducer().get(); + Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class)); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + private List getJobExecutionPlans() throws URISyntaxException { + Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build(); + Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build(); + Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build(); + List flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3); + + Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build(); + Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build(); + Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build(); + List jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3); + List jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Config jobConfig = jobConfigs.get(i); + FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build(); + if(i==2){ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty())); + } + else{ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty())); + } + } + return jobExecutionPlans; + } +} \ No newline at end of file