From 618f3578d15c64b10e17105bb84c676c3d19fb9a Mon Sep 17 00:00:00 2001 From: vaibsing Date: Fri, 15 Nov 2024 10:44:05 +0530 Subject: [PATCH 1/9] [GOBBLIN-2173] Avoid Adhoc flow spec addition for non leasable entity --- .../restli/FlowConfigsV2ResourceHandler.java | 6 ++- .../api/LeaseUnavailableException.java | 7 +++ .../DagManagementStateStore.java | 5 ++ .../InstrumentedLeaseArbiter.java | 5 ++ .../MultiActiveLeaseArbiter.java | 3 ++ .../MySqlDagManagementStateStore.java | 9 +++- .../MysqlMultiActiveLeaseArbiter.java | 6 +++ .../modules/orchestration/Orchestrator.java | 25 ++++++++++ .../MySqlDagManagementStateStoreTest.java | 17 ++++++- .../MysqlMultiActiveLeaseArbiterTest.java | 38 +++++++++++++++ .../orchestration/OrchestratorTest.java | 46 ++++++++++++++++++- 11 files changed, 162 insertions(+), 5 deletions(-) create mode 100644 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 927909e57dd..ff57745c7e0 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -60,6 +60,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.FlowSpecSearchObject; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; @@ -256,7 +257,10 @@ public CreateKVResponse, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch (Throwable e) { + } catch(LeaseUnavailableException e){ + throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); + } + catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java new file mode 100644 index 00000000000..8bdf92b8f63 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java @@ -0,0 +1,7 @@ +package org.apache.gobblin.runtime.api; + +public class LeaseUnavailableException extends RuntimeException { + public LeaseUnavailableException(String message) { + super(message); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index fb7b23fdf0a..8fe0eac9f01 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -103,6 +103,11 @@ public interface DagManagementStateStore { */ void updateDagNode(Dag.DagNode dagNode) throws IOException; + /** + * Returns true if lease can be acquired on entity, else returns false + */ + boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException; + /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. * Both params are returned as optional and are empty if not present in the store. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index 9e1c270c493..67042f75955 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -90,6 +90,11 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus (%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams)); } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + } + @Override public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status) throws IOException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index c9a3b152bf8..2da76f29249 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -61,6 +61,9 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) + throws IOException; + /** * This method is used to indicate the owner of the lease has successfully completed required actions while holding * the lease of the dag action event. It marks the lease as "no longer leasing", if the eventTimeMillis and diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 29e652cce8e..cef538570a7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { // todo - these two stores should merge private DagStateStoreWithDagNodes dagStateStore; private DagStateStoreWithDagNodes failedDagStateStore; + private MultiActiveLeaseArbiter multiActiveLeaseArbiter; private final JobStatusRetriever jobStatusRetriever; private boolean dagStoresInitialized = false; private final UserQuotaManager quotaManager; @@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { @Inject public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager, - JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) { + JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, MultiActiveLeaseArbiter multiActiveLeaseArbiter) { this.quotaManager = userQuotaManager; this.config = config; this.flowCatalog = flowCatalog; this.jobStatusRetriever = jobStatusRetriever; this.dagManagerMetrics.activate(); this.dagActionStore = dagActionStore; + this.multiActiveLeaseArbiter = multiActiveLeaseArbiter; } // It should be called after topology spec map is set @@ -168,6 +170,11 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) this.dagStateStore.updateDagNode(dagNode); } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return multiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + } + @Override public Optional> getDag(Dag.DagId dagId) throws IOException { return Optional.ofNullable(this.dagStateStore.getDag(dagId)); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 48112790481..8cb930f1dd4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { } } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + Optional infoResult = getExistingEventInfo(leaseParams); + return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; + } + /** * Checks leaseArbiterTable for an existing entry for this dag action and event time */ diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index ae053ab51b4..5dbc6e8f3a6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.RestLiServiceException; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -24,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final SpecCompiler specCompiler; protected final TopologyCatalog topologyCatalog; private final JobStatusRetriever jobStatusRetriever; + private final DagManagementStateStore dagManagementStateStore; protected final MetricContext metricContext; @@ -100,6 +104,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + try { + if (!dagManagementStateStore.existsLeasableEntity(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + } + } catch (IOException exception) { + _log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception); + throw new RuntimeException("Error querying leaseArbiterTable", exception); + } + } + } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index c14a7b62386..cba69630957 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,6 +60,7 @@ public class MySqlDagManagementStateStoreTest { private ITestMetastoreDatabase testDb; + private static MultiActiveLeaseArbiter leaseArbiter; private MySqlDagManagementStateStore dagManagementStateStore; private static final String TEST_USER = "testUser"; public static final String TEST_PASSWORD = "testPassword"; @@ -68,6 +70,7 @@ public class MySqlDagManagementStateStoreTest { @BeforeClass public void setUp() throws Exception { // Setting up mock DB + this.leaseArbiter = mock(MultiActiveLeaseArbiter.class); this.testDb = TestMetastoreDatabaseFactory.get(); this.dagManagementStateStore = getDummyDMSS(this.testDb); } @@ -92,6 +95,16 @@ public static boolean compareLists(List list1, List list2) { return true; } + @Test + public void testExistsLeasableEntity() throws Exception{ + Mockito.when(leaseArbiter.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); + Assert.assertTrue(dagManagementStateStore.existsLeasableEntity(leaseParams)); + } + @Test public void testAddDag() throws Exception { Dag dag = DagTestUtils.buildDag("test", 12345L); @@ -150,9 +163,11 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t TopologySpec topologySpec = LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI); URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI); topologySpecMap.put(specExecURI, topologySpec); + MultiActiveLeaseArbiter multiActiveLeaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); + leaseArbiter = multiActiveLeaseArbiter; MySqlDagManagementStateStore dagManagementStateStore = new MySqlDagManagementStateStore(config, null, null, jobStatusRetriever, - MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase)); + MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), multiActiveLeaseArbiter); dagManagementStateStore.setTopologySpecMap(topologySpecMap); return dagManagementStateStore; } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 9b132fe0d9b..0553f985b57 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -44,6 +44,7 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final long EPSILON = 10000L; private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1); + private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90); // NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large enough value that exec. variability won't cause spurious failure private static final long LINGER = 20000L; private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1); @@ -53,6 +54,8 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String CONSTANTS_TABLE = "constants_store"; private static final String flowGroup = "testFlowGroup"; private static final String flowGroup2 = "testFlowGroup2"; + private static final String flowGroup3 = "testFlowGroup3"; + private static final String flowGroup4 = "testFlowGroup4"; private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; private static final long flowExecutionId = 12345677L; @@ -70,6 +73,14 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction3 = + new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams3 = new DagActionStore.LeaseParams(launchDagAction3, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction4 = + new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis); private static final Timestamp dummyTimestamp = new Timestamp(99999); private ITestMetastoreDatabase testDb; private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams3)); + } + + /* + test to verify if leasable entity exists post epsilon time + */ + @Test + public void testWhenLeasableEntityAvailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams4); + Thread.sleep(MORE_THAN_EPSILON); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams4)); + } + /* Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table. If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index ee5f14cb873..72364d7d580 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -18,13 +18,19 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; +import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; +import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -86,6 +92,8 @@ public class OrchestratorTest { private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; + private DagManagementStateStore dagManagementStateStore; + private SpecCompiler specCompiler; @BeforeClass public void setUpClass() throws Exception { @@ -107,7 +115,7 @@ public void setUp() throws Exception { flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR); this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, "OrchestratorCatalogTest"); - + this.specCompiler = Mockito.mock(SpecCompiler.class); this.topologyCatalog = new TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties), Optional.of(logger)); this.serviceLauncher.addService(topologyCatalog); @@ -116,9 +124,10 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore=dagManagementStateStore; SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties)); @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + @Test public void deleteFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` From 3fa68a18dce5883229661a0b95fad74842bdba12 Mon Sep 17 00:00:00 2001 From: vaibsing Date: Fri, 15 Nov 2024 11:50:57 +0530 Subject: [PATCH 2/9] Updating function documentation --- .../api/LeaseUnavailableException.java | 20 +++++++++++++++++++ .../DagManagementStateStore.java | 6 ++++-- .../InstrumentedLeaseArbiter.java | 4 ++-- .../MultiActiveLeaseArbiter.java | 9 ++++++++- .../MySqlDagManagementStateStore.java | 4 ++-- .../MysqlMultiActiveLeaseArbiter.java | 2 +- .../modules/orchestration/Orchestrator.java | 2 +- .../MySqlDagManagementStateStoreTest.java | 6 +++--- .../MysqlMultiActiveLeaseArbiterTest.java | 4 ++-- .../orchestration/OrchestratorTest.java | 4 ++-- 10 files changed, 45 insertions(+), 16 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java index 8bdf92b8f63..20a645c9e0e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java @@ -1,5 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.runtime.api; +/** + * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + */ public class LeaseUnavailableException extends RuntimeException { public LeaseUnavailableException(String message) { super(message); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 8fe0eac9f01..fe455785bb5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -104,9 +104,11 @@ public interface DagManagementStateStore { void updateDagNode(Dag.DagNode dagNode) throws IOException; /** - * Returns true if lease can be acquired on entity, else returns false + * Returns true if lease can be acquired on entity provided in leaseParams. + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event */ - boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException; + boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index 67042f75955..b25af7b1367 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -91,8 +91,8 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams } @Override - public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { - return decoratedMultiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index 2da76f29249..a957965c974 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -61,7 +61,14 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; - boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + */ + boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException; /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index cef538570a7..3bf0b02943e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -171,8 +171,8 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) } @Override - public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { - return multiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return multiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 8cb930f1dd4..b27c7d92701 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -363,7 +363,7 @@ else if (leaseValidityStatus == 2) { } @Override - public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { Optional infoResult = getExistingEventInfo(leaseParams); return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 5dbc6e8f3a6..dae20f710ab 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -148,7 +148,7 @@ private void validateAdhocFlowLeasability(FlowSpec flowSpec) { FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); try { - if (!dagManagementStateStore.existsLeasableEntity(leaseParams)) { + if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) { throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); } } catch (IOException exception) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index cba69630957..a78668f1927 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -96,13 +96,13 @@ public static boolean compareLists(List list1, List list2) { } @Test - public void testExistsLeasableEntity() throws Exception{ - Mockito.when(leaseArbiter.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + public void testcanAcquireLeaseOnEntity() throws Exception{ + Mockito.when(leaseArbiter.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); String flowName = "testFlow"; String flowGroup = "testGroup"; DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); - Assert.assertTrue(dagManagementStateStore.existsLeasableEntity(leaseParams)); + Assert.assertTrue(dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)); } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 0553f985b57..d82e9bdd10c 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -223,7 +223,7 @@ public void testWhenLeasableEntityUnavailable() throws Exception{ Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams3); Thread.sleep(LESS_THAN_EPSILON); - Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams3)); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams3)); } /* @@ -236,7 +236,7 @@ public void testWhenLeasableEntityAvailable() throws Exception{ Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams4); Thread.sleep(MORE_THAN_EPSILON); - Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams4)); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams4)); } /* diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 72364d7d580..e75c73b7701 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -332,7 +332,7 @@ public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); } @@ -346,7 +346,7 @@ public void testOnAddSpec_withFlowSpec_Available() throws IOException { .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); AddSpecResponse response = new AddSpecResponse<>(new Object()); Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); From 30929f8a5d79b6ee568c743045b0769cf4d51537 Mon Sep 17 00:00:00 2001 From: vaibsing Date: Sat, 16 Nov 2024 00:56:48 +0530 Subject: [PATCH 3/9] Update unit tests --- .../DagManagementStateStore.java | 6 ++-- .../InstrumentedLeaseArbiter.java | 4 +-- .../MultiActiveLeaseArbiter.java | 5 ++-- .../MySqlDagManagementStateStore.java | 4 +-- .../MysqlMultiActiveLeaseArbiter.java | 6 +++- .../modules/orchestration/Orchestrator.java | 10 +++++-- .../MySqlDagManagementStateStoreTest.java | 4 +-- .../MysqlMultiActiveLeaseArbiterTest.java | 4 +-- .../orchestration/OrchestratorTest.java | 29 ++++++++++++++++--- 9 files changed, 51 insertions(+), 21 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index fe455785bb5..6df84aa0426 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -105,10 +105,10 @@ public interface DagManagementStateStore { /** * Returns true if lease can be acquired on entity provided in leaseParams. - * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action - * was triggered, and if the dag action event we're checking on is a reminder event + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, + * and if the dag action event we're checking on is a reminder event */ - boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException; + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index b25af7b1367..0c4b59b34a3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -91,8 +91,8 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams } @Override - public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { - return decoratedMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams); + public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.isLeaseAcquirable(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index a957965c974..c0d51629db7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -64,11 +64,12 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole /** * This method checks if lease can be acquired on provided flow in lease params * returns true if entry for the same flow does not exists within epsilon time - * in leaseArbiterStore + * in leaseArbiterStore, else returns false * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease can be acquired on the flow passed in the lease params, false otherwise */ - boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 3bf0b02943e..f84da0e7888 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -171,8 +171,8 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) } @Override - public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { - return multiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams); + public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { + return multiActiveLeaseArbiter.isLeaseAcquirable(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index b27c7d92701..8ba1ce3a4ef 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -362,8 +362,12 @@ else if (leaseValidityStatus == 2) { } } + /* + Determines if a lease can be acquired for the given flow. A lease is acquirable if + no existing lease record exists in arbiter table or the record is older then epsilon time + */ @Override - public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { Optional infoResult = getExistingEventInfo(leaseParams); return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index dae20f710ab..16f260f17d5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -17,8 +17,6 @@ package org.apache.gobblin.service.modules.orchestration; -import com.linkedin.restli.common.HttpStatus; -import com.linkedin.restli.server.RestLiServiceException; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -139,16 +137,22 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + validates if lease can be acquired on the provided flowSpec, + else throw LeaseUnavailableException + */ private void validateAdhocFlowLeasability(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); try { - if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) { + if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); } } catch (IOException exception) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index a78668f1927..a6e3b604607 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -97,12 +97,12 @@ public static boolean compareLists(List list1, List list2) { @Test public void testcanAcquireLeaseOnEntity() throws Exception{ - Mockito.when(leaseArbiter.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); String flowName = "testFlow"; String flowGroup = "testGroup"; DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); - Assert.assertTrue(dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)); + Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams)); } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index d82e9bdd10c..735de145f76 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -223,7 +223,7 @@ public void testWhenLeasableEntityUnavailable() throws Exception{ Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams3); Thread.sleep(LESS_THAN_EPSILON); - Assert.assertFalse(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams3)); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); } /* @@ -236,7 +236,7 @@ public void testWhenLeasableEntityAvailable() throws Exception{ Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams4); Thread.sleep(MORE_THAN_EPSILON); - Assert.assertTrue(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams4)); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4)); } /* diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index e75c73b7701..57da46a833d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -326,18 +326,40 @@ public void createFlowSpec() throws Throwable { and throw LeaseUnavailableException */ @Test(expectedExceptions = LeaseUnavailableException.class) - public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); } + /* + If no other flow has acquired lease within the epsilon time, then flow + compilation and addition to the store occurs normally + */ + @Test + public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + + /* + For Scheduled flow lease acquirable check does not occur, + and flow compilation occurs successfully + */ @Test - public void testOnAddSpec_withFlowSpec_Available() throws IOException { + public void onAddSpecForScheduledFlow() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") @@ -346,7 +368,6 @@ public void testOnAddSpec_withFlowSpec_Available() throws IOException { .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); AddSpecResponse response = new AddSpecResponse<>(new Object()); Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); From a7822025b98c0168ff0748e4c65d83b2a4b83558 Mon Sep 17 00:00:00 2001 From: vaibsing Date: Tue, 19 Nov 2024 06:35:48 +0530 Subject: [PATCH 4/9] Refactoring and addressed comments --- .../restli/FlowConfigsV2ResourceHandler.java | 10 ++-- ...a => TooSoonToRerunSameFlowException.java} | 11 +++- .../DagManagementStateStore.java | 9 ++-- .../InstrumentedLeaseArbiter.java | 4 +- .../MultiActiveLeaseArbiter.java | 4 +- .../MySqlDagManagementStateStore.java | 8 ++- .../MysqlMultiActiveLeaseArbiter.java | 4 +- .../modules/orchestration/Orchestrator.java | 23 ++++----- .../MySqlDagManagementStateStoreTest.java | 19 ++++--- .../MysqlMultiActiveLeaseArbiterTest.java | 17 +++++-- .../orchestration/OrchestratorTest.java | 50 +++++++++++-------- 11 files changed, 96 insertions(+), 63 deletions(-) rename gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/{LeaseUnavailableException.java => TooSoonToRerunSameFlowException.java} (77%) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index ff57745c7e0..9ec198c633d 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -60,7 +60,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.FlowSpecSearchObject; -import org.apache.gobblin.runtime.api.LeaseUnavailableException; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; @@ -257,10 +257,10 @@ public CreateKVResponse, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch(LeaseUnavailableException e){ - throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); - } - catch (Throwable e) { + } catch(TooSoonToRerunSameFlowException e) { + return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, + "FlowSpec with URI " + flowSpec.getUri() + " was launched within the lease consolidation period, no action will be taken")); + } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java similarity index 77% rename from gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java rename to gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java index 20a645c9e0e..5aa49030aea 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java @@ -20,8 +20,15 @@ /** * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. */ -public class LeaseUnavailableException extends RuntimeException { - public LeaseUnavailableException(String message) { +public class TooSoonToRerunSameFlowException extends RuntimeException { + private final FlowSpec flowSpec; + + public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) { super(message); + this.flowSpec = flowSpec; + } + + public FlowSpec getFlowSpec() { + return flowSpec; } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 6df84aa0426..4b546ebfa55 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -105,10 +105,13 @@ public interface DagManagementStateStore { /** * Returns true if lease can be acquired on entity provided in leaseParams. - * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, - * and if the dag action event we're checking on is a reminder event + * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @throws IOException */ - boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; + boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index 0c4b59b34a3..746ab662377 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -91,8 +91,8 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams } @Override - public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { - return decoratedMultiActiveLeaseArbiter.isLeaseAcquirable(leaseParams); + public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index c0d51629db7..38a7d5d8b37 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -63,13 +63,13 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole /** * This method checks if lease can be acquired on provided flow in lease params - * returns true if entry for the same flow does not exists within epsilon time + * returns true if entry for the same flow does not exists within Lease Consolidation Period * in leaseArbiterStore, else returns false * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action * was triggered, and if the dag action event we're checking on is a reminder event * @return true if lease can be acquired on the flow passed in the lease params, false otherwise */ - boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) + boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException; /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index f84da0e7888..c3c33027bfa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -41,6 +41,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -171,8 +172,11 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) } @Override - public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { - return multiActiveLeaseArbiter.isLeaseAcquirable(leaseParams); + public boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException { + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + flowExecutionId, DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + return multiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 8ba1ce3a4ef..bea6394bfe7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -367,9 +367,9 @@ else if (leaseValidityStatus == 2) { no existing lease record exists in arbiter table or the record is older then epsilon time */ @Override - public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { + public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException { Optional infoResult = getExistingEventInfo(leaseParams); - return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; + return infoResult.isPresent() ? infoResult.get().isWithinEpsilon() : false; } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 16f260f17d5..fbf47f0fcd5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -24,7 +24,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.gobblin.runtime.api.LeaseUnavailableException; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec); this.specCompiler.onAddSpec(addedSpec); } else if (addedSpec instanceof FlowSpec) { - validateAdhocFlowLeasability((FlowSpec) addedSpec); + enforceSimilarAdhocFlowExistence((FlowSpec) addedSpec); _log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec); return this.specCompiler.onAddSpec(addedSpec); } else { @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - validates if lease can be acquired on the provided flowSpec, - else throw LeaseUnavailableException + enforces that a similar flow is not launching, + else throw TooSoonToRerunSameFlowException */ - private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, - FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); - _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + _log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName); try { - if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { - throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { + throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec); } } catch (IOException exception) { - _log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception); - throw new RuntimeException("Error querying leaseArbiterTable", exception); + _log.error("unable to check whether similar flow exists " + flowGroup + "." + flowName); + throw new RuntimeException("unable to check whether similar flow exists " + flowGroup + "." + flowName, exception); } } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index a6e3b604607..58c97bdd71b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -48,8 +48,7 @@ import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.util.CompletedFuture; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -96,13 +95,19 @@ public static boolean compareLists(List list1, List list2) { } @Test - public void testcanAcquireLeaseOnEntity() throws Exception{ - Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws Exception{ + Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); String flowName = "testFlow"; String flowGroup = "testGroup"; - DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); - Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams)); + Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class))); + } + + @Test + public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws Exception{ + Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class))); } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 735de145f76..7cd9c664ac6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; private static final long flowExecutionId = 12345677L; + private static final long flowExecutionId1 = 12345996L; private static final long eventTimeMillis = 1710451837L; // Dag actions with the same flow info but different flow action types are considered unique private static final DagActionStore.DagAction launchDagAction = @@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction3_similar = + new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams3_similar = new DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction4_similar = + new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams4_similar = new DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis); private static final Timestamp dummyTimestamp = new Timestamp(99999); private ITestMetastoreDatabase testDb; private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; @@ -217,26 +226,26 @@ public void testAcquireLeaseSingleParticipant() throws Exception { to account for clock drift */ @Test - public void testWhenLeasableEntityUnavailable() throws Exception{ + public void testExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{ LeaseAttemptStatus firstLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams3); Thread.sleep(LESS_THAN_EPSILON); - Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams3_similar)); } /* test to verify if leasable entity exists post epsilon time */ @Test - public void testWhenLeasableEntityAvailable() throws Exception{ + public void testDoesNotExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{ LeaseAttemptStatus firstLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); completeLeaseHelper(launchLeaseParams4); Thread.sleep(MORE_THAN_EPSILON); - Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4)); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams4_similar)); } /* diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 57da46a833d..53c768176d6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -26,8 +26,9 @@ import org.apache.commons.io.FileUtils; import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.runtime.api.LeaseUnavailableException; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; +import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.hadoop.fs.Path; import org.mockito.Mockito; @@ -69,6 +70,7 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -91,7 +93,7 @@ public class OrchestratorTest { private FlowCatalog flowCatalog; private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; - private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; + private Orchestrator orchestrator; private DagManagementStateStore dagManagementStateStore; private SpecCompiler specCompiler; @@ -127,18 +129,18 @@ public void setUp() throws Exception { MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); - this.dagManagementStateStore=dagManagementStateStore; + this.dagManagementStateStore = dagManagementStateStore; SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties)); FlowCompilationValidationHelper flowCompilationValidationHelper = new FlowCompilationValidationHelper(ConfigFactory.empty(), sharedFlowMetricsSingleton, mock(UserQuotaManager.class), dagManagementStateStore); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), + this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), this.topologyCatalog, Optional.of(logger), mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton, dagManagementStateStore, flowCompilationValidationHelper, mock(JobStatusRetriever.class)); - this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); - this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); + this.topologyCatalog.addListener(orchestrator); + this.flowCatalog.addListener(orchestrator); // Start application this.serviceLauncher.start(); // Create Spec to play with @@ -242,7 +244,7 @@ public URI computeTopologySpecURI(String parent, String current) { // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere - move it, then rework into `@BeforeMethod` init (since others depend on this) @Test public void createTopologySpec() { - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); // List Current Specs Collection specs = topologyCatalog.getSpecs(); @@ -281,7 +283,7 @@ public void createFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` createTopologySpec(); // make 1 Topology with 1 SpecProducer available and responsible for our new FlowSpec - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs @@ -325,15 +327,18 @@ public void createFlowSpec() throws Throwable { epsilon time, then we do not execute this flow, hence do not process and store the spec and throw LeaseUnavailableException */ - @Test(expectedExceptions = LeaseUnavailableException.class) - public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { + @Test(expectedExceptions = TooSoonToRerunSameFlowException.class) + public void onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") - .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()) + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); - dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true); + orchestrator.onAddSpec(flowSpec); } /* @@ -341,16 +346,17 @@ public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { compilation and addition to the store occurs normally */ @Test - public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException { + public void onAddSpecForAdhocFlowWhenNoExistingFlowIsCurrentlyLaunching() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()) .addPrimitive("gobblin.flow.sourceIdentifier", "source") .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); - AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false); + AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); Assert.assertNotNull(addSpecResponse); } @@ -370,8 +376,10 @@ public void onAddSpecForScheduledFlow() throws IOException { FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); AddSpecResponse response = new AddSpecResponse<>(new Object()); Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); - AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); Assert.assertNotNull(addSpecResponse); + // Verifying that for scheduled flow isLeaseAcquirable is not called + Mockito.verify(dagManagementStateStore, Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(), anyLong()); } @Test @@ -379,7 +387,7 @@ public void deleteFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` createFlowSpec(); // make 1 Flow available (for deletion herein) - IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler(); + IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs @@ -422,19 +430,19 @@ public void doNotRegisterMetricsAdhocFlows() throws Throwable { createTopologySpec(); // for flow compilation to pass FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); - MetricContext metricContext = this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext(); + MetricContext metricContext = this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext(); String metricName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED); this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec, new Properties(), 0, false); + this.orchestrator.orchestrate(adhocSpec, new Properties(), 0, false); Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName)); Properties scheduledProps = new Properties(); scheduledProps.setProperty("job.schedule", "0/2 * * * * ?"); FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId, scheduledProps); - this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new Properties(), 0, false); + this.orchestrator.orchestrate(scheduledSpec, new Properties(), 0, false); Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName)); } From 0981b82dcbdf8d1c30a1088ad24da71b206ba30e Mon Sep 17 00:00:00 2001 From: vaibsing Date: Tue, 19 Nov 2024 08:30:47 +0530 Subject: [PATCH 5/9] Extract TooSoonToRerunSameFlowException from Throwable in Flow Catalog listeners --- .../apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 8 +++++++- .../orchestration/MySqlDagManagementStateStore.java | 1 - 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 24732cc862c..3b1e8868f1a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -392,7 +393,12 @@ private Map updateOrAddSpecHelper(Spec spec, boolean tr // If flow fails compilation, the result will have a non-empty string with the error if (!response.getValue().getFailures().isEmpty()) { for (Map.Entry> entry : response.getValue().getFailures().entrySet()) { - throw entry.getValue().getError().getCause(); + Throwable error = entry.getValue().getError(); + if (error instanceof TooSoonToRerunSameFlowException) { + throw (TooSoonToRerunSameFlowException) error; + } else { + throw error.getCause(); + } } } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c3c33027bfa..8307f46fdf3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -41,7 +41,6 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; From 7d2b49f16a840a9a4a99e9274a915ab189ababff Mon Sep 17 00:00:00 2001 From: vaibsing Date: Tue, 19 Nov 2024 15:04:29 +0530 Subject: [PATCH 6/9] Address PR comments --- .../restli/FlowConfigsV2ResourceHandler.java | 2 +- .../api/TooSoonToRerunSameFlowException.java | 12 +++++----- .../runtime/spec_catalog/FlowCatalog.java | 6 ++--- .../DagManagementStateStore.java | 11 ++++----- .../MultiActiveLeaseArbiter.java | 8 +++---- .../MySqlDagManagementStateStore.java | 2 +- .../modules/orchestration/Orchestrator.java | 17 ++++++------- .../MySqlDagManagementStateStoreTest.java | 4 ++-- .../MysqlMultiActiveLeaseArbiterTest.java | 24 ++++++++----------- .../orchestration/OrchestratorTest.java | 15 ++++++------ 10 files changed, 48 insertions(+), 53 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 9ec198c633d..055724d83d6 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -259,7 +259,7 @@ public CreateKVResponse, FlowConfig> cr throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } catch(TooSoonToRerunSameFlowException e) { return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, - "FlowSpec with URI " + flowSpec.getUri() + " was launched within the lease consolidation period, no action will be taken")); + "FlowSpec with URI " + flowSpec.getUri() + " was previously launched within the lease consolidation period, no action will be taken")); } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java index 5aa49030aea..9688b3b19a9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java @@ -17,18 +17,18 @@ package org.apache.gobblin.runtime.api; +import lombok.Getter; + + /** * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. */ public class TooSoonToRerunSameFlowException extends RuntimeException { + @Getter private final FlowSpec flowSpec; - public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) { - super(message); + public TooSoonToRerunSameFlowException(FlowSpec flowSpec) { + super("Lease already occupied by another recent execution of this flow: " + flowSpec); this.flowSpec = flowSpec; } - - public FlowSpec getFlowSpec() { - return flowSpec; - } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 3b1e8868f1a..bebe9417acb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -29,7 +29,6 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +57,7 @@ import org.apache.gobblin.runtime.api.SpecSearchObject; import org.apache.gobblin.runtime.api.SpecSerDe; import org.apache.gobblin.runtime.api.SpecStore; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe; import org.apache.gobblin.runtime.spec_store.FSSpecStore; import org.apache.gobblin.service.ServiceConfigKeys; @@ -394,8 +394,8 @@ private Map updateOrAddSpecHelper(Spec spec, boolean tr if (!response.getValue().getFailures().isEmpty()) { for (Map.Entry> entry : response.getValue().getFailures().entrySet()) { Throwable error = entry.getValue().getError(); - if (error instanceof TooSoonToRerunSameFlowException) { - throw (TooSoonToRerunSameFlowException) error; + if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) { + throw (TooSoonToRerunSameFlowException) error.getCause(); } else { throw error.getCause(); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 4b546ebfa55..8059eab4e1c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -104,14 +104,13 @@ public interface DagManagementStateStore { void updateDagNode(Dag.DagNode dagNode) throws IOException; /** - * Returns true if lease can be acquired on entity provided in leaseParams. - * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. - * @param flowGroup flow group for the dag action - * @param flowName flow name for the dag action - * @param flowExecutionId flow execution for the dag action + * Returns true if a flow has been launched recently with same flow name and flow group. + * @param flowGroup flow group for the flow + * @param flowName flow name for the flow + * @param flowExecutionId flow execution for the flow * @throws IOException */ - boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; + boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index 38a7d5d8b37..f580e936a51 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -62,12 +62,12 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole throws IOException; /** - * This method checks if lease can be acquired on provided flow in lease params - * returns true if entry for the same flow does not exists within Lease Consolidation Period - * in leaseArbiterStore, else returns false + * This method checks if entry for same flow name and flow group exists within the lease consolidation period + * returns true if entry for the same flow exists within Lease Consolidation Period (aka. epsilon) + * else returns false * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action * was triggered, and if the dag action event we're checking on is a reminder event - * @return true if lease can be acquired on the flow passed in the lease params, false otherwise + * @return true if lease for a recently launched flow already exists for the flow details in leaseParams */ boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 8307f46fdf3..b14d6bc85c2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -171,7 +171,7 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) } @Override - public boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException { + public boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException { DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionType.LAUNCH); DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index fbf47f0fcd5..b8d390580d4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -128,7 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec); this.specCompiler.onAddSpec(addedSpec); } else if (addedSpec instanceof FlowSpec) { - enforceSimilarAdhocFlowExistence((FlowSpec) addedSpec); + enforceNoRecentAdhocExecOfSameFlow((FlowSpec) addedSpec); _log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec); return this.specCompiler.onAddSpec(addedSpec); } else { @@ -138,23 +138,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - enforces that a similar flow is not launching, + enforces that a similar adhoc flow is not launching, else throw TooSoonToRerunSameFlowException */ - private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { + private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - _log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName); + _log.info("Checking existing adhoc flow entry for " + flowGroup + "." + flowName); try { - if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { - throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec); + if (dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { + _log.warn("Another recent adhoc flow execution found for " + flowGroup + "." + flowName); + throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec)); } } catch (IOException exception) { - _log.error("unable to check whether similar flow exists " + flowGroup + "." + flowName); - throw new RuntimeException("unable to check whether similar flow exists " + flowGroup + "." + flowName, exception); + _log.error("Unable to check whether similar flow exists " + flowGroup + "." + flowName); + throw new RuntimeException("Unable to check whether similar flow exists " + flowGroup + "." + flowName, exception); } } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index 58c97bdd71b..6dbbd0ba8b5 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -99,7 +99,7 @@ public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws Exception{ Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); String flowName = "testFlow"; String flowGroup = "testGroup"; - Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class))); + Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, System.currentTimeMillis())); } @Test @@ -107,7 +107,7 @@ public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws Exception Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); String flowName = "testFlow"; String flowGroup = "testGroup"; - Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class))); + Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, System.currentTimeMillis())); } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 7cd9c664ac6..9ba8f40c6b8 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -58,8 +58,8 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String flowGroup4 = "testFlowGroup4"; private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; - private static final long flowExecutionId = 12345677L; - private static final long flowExecutionId1 = 12345996L; + private static final long flowExecutionId = 12345677213L; + private static final long flowExecutionId1 = 12345996546L; private static final long eventTimeMillis = 1710451837L; // Dag actions with the same flow info but different flow action types are considered unique private static final DagActionStore.DagAction launchDagAction = @@ -74,22 +74,18 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, false, eventTimeMillis); - private static final DagActionStore.DagAction launchDagAction3 = - new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams - launchLeaseParams3 = new DagActionStore.LeaseParams(launchDagAction3, false, eventTimeMillis); - private static final DagActionStore.DagAction launchDagAction4 = - new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + launchLeaseParams3 = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); private static final DagActionStore.LeaseParams - launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis); - private static final DagActionStore.DagAction launchDagAction3_similar = - new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + launchLeaseParams3_similar = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); private static final DagActionStore.LeaseParams - launchLeaseParams3_similar = new DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis); - private static final DagActionStore.DagAction launchDagAction4_similar = - new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + launchLeaseParams4 = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); private static final DagActionStore.LeaseParams - launchLeaseParams4_similar = new DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis); + launchLeaseParams4_similar = new DagActionStore.LeaseParams(new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, + DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis); private static final Timestamp dummyTimestamp = new Timestamp(99999); private ITestMetastoreDatabase testDb; private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 53c768176d6..2a8d4f35a15 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -26,7 +26,6 @@ import org.apache.commons.io.FileUtils; import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flow.SpecCompiler; @@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable { /* If another flow has already acquired lease for this flowspec details within - epsilon time, then we do not execute this flow, hence do not process and store the spec - and throw LeaseUnavailableException + lease consolidation time, then we do not execute this flow, hence do not process and store the spec + and throw RuntimeException */ - @Test(expectedExceptions = TooSoonToRerunSameFlowException.class) + @Test(expectedExceptions = RuntimeException.class) public void onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") @@ -337,7 +336,7 @@ public void onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() t .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true); orchestrator.onAddSpec(flowSpec); } @@ -355,7 +354,7 @@ public void onAddSpecForAdhocFlowWhenNoExistingFlowIsCurrentlyLaunching() throws .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); Config config = configBuilder.build(); FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); - Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false); + Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName", FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false); AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); Assert.assertNotNull(addSpecResponse); } @@ -378,8 +377,8 @@ public void onAddSpecForScheduledFlow() throws IOException { Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); Assert.assertNotNull(addSpecResponse); - // Verifying that for scheduled flow isLeaseAcquirable is not called - Mockito.verify(dagManagementStateStore, Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(), anyLong()); + // Verifying that for scheduled flow existsCurrentlyLaunchingExecOfSameFlow is not called + Mockito.verify(dagManagementStateStore, Mockito.never()).existsCurrentlyLaunchingExecOfSameFlow(anyString(), anyString(), anyLong()); } @Test From 01cac22813d9c2e47f5fc9936de47349385f939e Mon Sep 17 00:00:00 2001 From: vaibsing Date: Wed, 20 Nov 2024 00:11:29 +0530 Subject: [PATCH 7/9] Edit TooSoonToRerunSameFlowException exception handling --- .../api/TooSoonToRerunSameFlowException.java | 14 ++++++++++++++ .../gobblin/runtime/spec_catalog/FlowCatalog.java | 8 +------- .../MysqlMultiActiveLeaseArbiter.java | 4 ---- .../modules/orchestration/Orchestrator.java | 6 +++--- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java index 9688b3b19a9..2c28de9a4d2 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java @@ -27,8 +27,22 @@ public class TooSoonToRerunSameFlowException extends RuntimeException { @Getter private final FlowSpec flowSpec; + /** + * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s + * @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException + */ + public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) { + return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec)); + } + public TooSoonToRerunSameFlowException(FlowSpec flowSpec) { super("Lease already occupied by another recent execution of this flow: " + flowSpec); this.flowSpec = flowSpec; } + + /** restricted-access ctor: use {@link #wrappedOnce(FlowSpec)} instead */ + private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) { + super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause); + this.flowSpec = flowSpec; + } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index bebe9417acb..24732cc862c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -57,7 +57,6 @@ import org.apache.gobblin.runtime.api.SpecSearchObject; import org.apache.gobblin.runtime.api.SpecSerDe; import org.apache.gobblin.runtime.api.SpecStore; -import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe; import org.apache.gobblin.runtime.spec_store.FSSpecStore; import org.apache.gobblin.service.ServiceConfigKeys; @@ -393,12 +392,7 @@ private Map updateOrAddSpecHelper(Spec spec, boolean tr // If flow fails compilation, the result will have a non-empty string with the error if (!response.getValue().getFailures().isEmpty()) { for (Map.Entry> entry : response.getValue().getFailures().entrySet()) { - Throwable error = entry.getValue().getError(); - if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) { - throw (TooSoonToRerunSameFlowException) error.getCause(); - } else { - throw error.getCause(); - } + throw entry.getValue().getError().getCause(); } } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index bea6394bfe7..fed800c8384 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -362,10 +362,6 @@ else if (leaseValidityStatus == 2) { } } - /* - Determines if a lease can be acquired for the given flow. A lease is acquirable if - no existing lease record exists in arbiter table or the record is older then epsilon time - */ @Override public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException { Optional infoResult = getExistingEventInfo(leaseParams); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index b8d390580d4..f0a9fdd43d9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -24,7 +24,6 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.service.modules.flow.FlowUtils; @@ -139,7 +139,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { /* enforces that a similar adhoc flow is not launching, - else throw TooSoonToRerunSameFlowException + else throw {@link TooSoonToRerunSameFlowException} */ private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { @@ -151,7 +151,7 @@ private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) { try { if (dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { _log.warn("Another recent adhoc flow execution found for " + flowGroup + "." + flowName); - throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec)); + throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec); } } catch (IOException exception) { _log.error("Unable to check whether similar flow exists " + flowGroup + "." + flowName); From b59694803569520030be7ace6b58bac7bdd27c72 Mon Sep 17 00:00:00 2001 From: vaibsing Date: Wed, 20 Nov 2024 00:45:27 +0530 Subject: [PATCH 8/9] Update javadoc of tests --- .../gobblin/runtime/api/TooSoonToRerunSameFlowException.java | 3 ++- .../service/modules/orchestration/OrchestratorTest.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java index 2c28de9a4d2..f718ec4a985 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java @@ -21,7 +21,8 @@ /** - * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + * An exception thrown when another {@link FlowSpec} with same flow name and flow group + * is submitted within lease consolidation time. */ public class TooSoonToRerunSameFlowException extends RuntimeException { @Getter diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 2a8d4f35a15..acfa6c51cad 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils; import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.service.modules.flow.FlowUtils; import org.apache.gobblin.service.modules.flow.SpecCompiler; @@ -324,9 +325,9 @@ public void createFlowSpec() throws Throwable { /* If another flow has already acquired lease for this flowspec details within lease consolidation time, then we do not execute this flow, hence do not process and store the spec - and throw RuntimeException + and throw TooSoonToRerunSameFlowException */ - @Test(expectedExceptions = RuntimeException.class) + @Test(expectedExceptions = TooSoonToRerunSameFlowException.class) public void onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws IOException { ConfigBuilder configBuilder = ConfigBuilder.create() .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") From 57752a71fb5a9860eccc4cd16798dd4851beec33 Mon Sep 17 00:00:00 2001 From: vaibsing Date: Wed, 11 Dec 2024 15:07:01 +0530 Subject: [PATCH 9/9] Handling for Non transient exceptions --- .../orchestration/DagProcessingEngine.java | 16 +++ .../modules/orchestration/proc/DagProc.java | 16 --- .../orchestration/proc/DagProcUtils.java | 17 ++- .../DagProcessingEngineTest.java | 15 ++- .../orchestration/proc/DagProcUtilsTest.java | 124 ++++++++++++++++++ 5 files changed, 160 insertions(+), 28 deletions(-) create mode 100644 gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java index 9ee21395b25..7dd5cdbf73d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import java.util.Collections; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.DagTask; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExceptionUtils; import org.apache.gobblin.util.ExecutorsUtils; @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService { public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis"; @Getter static long defaultJobStartDeadlineTimeMillis; public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name(); + // Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined + public static List> retryableExceptions = Collections.EMPTY_LIST; @Inject public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory, @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) { defaultJobStartDeadlineTimeMillis = deadlineTimeMs; } + public static boolean isTransientException(Exception e) { + return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions); + } + @Override protected void startUp() { Integer numThreads = ConfigUtils.getInt @@ -149,6 +158,13 @@ public void run() { dagTask.conclude(); log.info(dagProc.contextualizeStatus("concluded dagTask")); } catch (Exception e) { + if(!DagProcessingEngine.isTransientException(e)){ + log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ", + dagTask, e); + dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); + dagTask.conclude(); + } + // Todo add the else block for transient exceptions and add conclude task only if retry limit is not breached log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e); dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java index 6c9694c80f7..252861830e1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java @@ -33,7 +33,6 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.util.ExceptionUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.orchestration.DagActionStore; @@ -92,20 +91,9 @@ public final void process(DagManagementStateStore dagManagementStateStore, dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false); throw e; } - try { logContextualizedInfo("ready to process"); act(dagManagementStateStore, state, dagProcEngineMetrics); logContextualizedInfo("processed"); - } catch (Exception e) { - if (isNonTransientException(e)) { - log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ", - getDagTask(), e); - dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); - dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); - } else { - throw e; - } - } } protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException; @@ -126,8 +114,4 @@ public String contextualizeStatus(String message) { public void logContextualizedInfo(String message) { log.info(contextualizeStatus(message)); } - - protected boolean isNonTransientException(Exception e) { - return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions); - } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 346037f6d2f..adc8cc52844 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -74,7 +74,6 @@ public class DagProcUtils { public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag dag, Dag.DagId dagId) throws IOException { Set> nextNodes = DagUtils.getNext(dag); - if (nextNodes.size() == 1) { Dag.DagNode dagNode = nextNodes.iterator().next(); DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId); @@ -98,6 +97,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat Dag.DagId dagId) { DagUtils.incrementJobAttempt(dagNode); JobExecutionPlan jobExecutionPlan = DagUtils.getJobExecutionPlan(dagNode); + JobSpec jobSpec = DagUtils.getJobSpec(dagNode); Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); @@ -139,12 +139,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); - String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; - log.error(message, e); - jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); - if (jobFailedTimer != null) { - jobFailedTimer.stop(jobMetadata); + // Only mark the job as failed in case of non transient exceptions + if(!DagProcessingEngine.isTransientException(e)){ + TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); + String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; + log.error(message, e); + jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); + if (jobFailedTimer != null) { + jobFailedTimer.stop(jobMetadata); + } } try { // when there is no exception, quota will be released in job status monitor or re-evaluate dag proc diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java index 8fea5303aeb..929bfdb1ed9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java @@ -45,9 +45,8 @@ import org.apache.gobblin.testing.AssertWithBackoff; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; + @Slf4j public class DagProcessingEngineTest { @@ -196,9 +195,15 @@ public void dagProcessingTest() 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. " + "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(), log, 1, 1000L); - + // Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions); - Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions); + Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions); + } + + @Test + public void isNonTransientExceptionTest(){ + Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!"))); + Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!"))); } private enum ExceptionType { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java new file mode 100644 index 00000000000..5fdd5f81ab2 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java @@ -0,0 +1,124 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(EventSubmitter.class) +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeClass + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List> dagNodeList = new ArrayList<>(); + List jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + @Test + public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException { + Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + @Test(expectedExceptions = RuntimeException.class) + public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{ + Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + SpecProducer mockedSpecProducer = mockSpecExecutor.getProducer().get(); + Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class)); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + } + + private List getJobExecutionPlans() throws URISyntaxException { + Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build(); + Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build(); + Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build(); + List flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3); + + Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build(); + Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build(); + Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build(); + List jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3); + List jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Config jobConfig = jobConfigs.get(i); + FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build(); + if(i==2){ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty())); + } + else{ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty())); + } + } + return jobExecutionPlans; + } +} \ No newline at end of file