From 3950d10f080c0ce5b662d49448fd84b2eb798bf6 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Tue, 24 Sep 2024 14:45:11 +0530 Subject: [PATCH 1/5] added some unit tests for gobblin temporal module --- gobblin-temporal/build.gradle | 5 +- .../AbstractNestingExecWorkflowImpl.java | 14 +- .../impl/GenerateWorkUnitsImplTest.java | 2 +- .../temporal/ddm/utils/JobStateUtilTest.java | 141 ++++++++++++++++++ .../AbstractNestingExecWorkflowImplTest.java | 139 +++++++++++++++++ 5 files changed, 293 insertions(+), 8 deletions(-) create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 1832ac909de..5e67f156030 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -67,11 +67,14 @@ dependencies { testCompile project(":gobblin-example") testCompile externalDependency.testng - testCompile externalDependency.mockito + testCompile externalDependency.mockitoInline + testCompile externalDependency.powerMockApi + testCompile externalDependency.powerMockModule testCompile externalDependency.hadoopYarnMiniCluster testCompile externalDependency.curatorFramework testCompile externalDependency.curatorTest + testCompile ('com.google.inject:guice:3.0') { force = true } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 6bef7a609cb..e07c0d18d2d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -34,6 +34,9 @@ import io.temporal.workflow.Workflow; import lombok.extern.slf4j.Slf4j; +import com.google.common.annotations.VisibleForTesting; + + import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; @@ -115,8 +118,9 @@ protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + @VisibleForTesting /** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */ - protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { + public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { // (only pause when an appreciable number of leaves) // TODO: use a configuration value, for simpler adjustment, rather than hard-code return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT @@ -130,11 +134,9 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi * List naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren); * @return each sub-tree's desired size, in ascending sub-tree order */ - protected static List consolidateSubTreeGrandChildren( - final int numSubTreesPerSubTree, - final int numChildrenTotal, - final int numSubTreeChildren - ) { + @VisibleForTesting + public static List consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree, + final int numChildrenTotal, final int numSubTreeChildren) { if (numSubTreesPerSubTree <= 0) { return Lists.newArrayList(); } else if (isSqrt(numSubTreeChildren, numChildrenTotal)) { diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java index 86c5ac12de1..57f0d18c80f 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java @@ -94,4 +94,4 @@ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() { Set output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream); Assert.assertEquals(output.size(), 11); } -} +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java new file mode 100644 index 00000000000..62e17bcea40 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.ddm.utils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; + + +public class JobStateUtilTest { + + private JobState jobState; + private FileSystem fileSystem; + + @BeforeMethod + public void setUp() { + jobState = Mockito.mock(JobState.class); + fileSystem = Mockito.mock(FileSystem.class); + } + + @Test + public void testOpenFileSystem() throws IOException { + + Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); + Mockito.when(jobState.getProperties()).thenReturn(new Properties()); + + FileSystem fs = JobStateUtils.openFileSystem(jobState); + + Assert.assertNotNull(fs); + Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString()); + } + + @Test + public void testCreateSource() throws ReflectiveOperationException { + Mockito.when(jobState.getProp(Mockito.anyString())) + .thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource"); + Source source = JobStateUtils.createSource(jobState); + Assert.assertNotNull(source); + } + + @Test + public void testOpenTaskStateStoreUncached() throws URISyntaxException { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test"); + Mockito.when(jobState.getJobId()).thenReturn("testJobId"); + Mockito.when(jobState.getJobName()).thenReturn("testJobName"); + Mockito.when(fileSystem.makeQualified(Mockito.any())) + .thenReturn(new Path("file:///test/testJobName/testJobId/output")); + Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output")); + + StateStore stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem); + + Assert.assertNotNull(stateStore); + } + + @Test + public void testGetFileSystemUri() { + Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); + URI fsUri = JobStateUtils.getFileSystemUri(jobState); + Assert.assertEquals(URI.create("file:///test"), fsUri); + Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString()); + } + + @Test + public void testGetWorkDirRoot() { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path rootPath = JobStateUtils.getWorkDirRoot(jobState); + Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath); + Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString()); + } + + @Test + public void testGetWorkUnitsPath() { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState); + Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath); + } + + @Test + public void testGetTaskStateStorePath() throws IOException { + Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path")); + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem); + Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath); + } + + @Test + public void testWriteJobState() throws IOException { + Path workDirRootPath = new Path("/tmp"); + FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class); + Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos); + + JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem); + + Mockito.verify(fileSystem).create(Mockito.any(Path.class)); + Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean()); + } + + @Test + public void testGetSharedResourcesBroker() { + Mockito.when(jobState.getProperties()).thenReturn(System.getProperties()); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState)); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java new file mode 100644 index 00000000000..4d0ae66f0be --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Workflow.class) +public class AbstractNestingExecWorkflowImplTest { + + @Mock + private Workload mockWorkload; + + @Mock + private WorkflowAddr mockWorkflowAddr; + + @Mock + private Workload.WorkSpan mockWorkSpan; + + @Mock + private Promise mockPromise; + + private AbstractNestingExecWorkflowImpl workflow; + + @BeforeClass + public void setup() { + // PowerMockito is required to mock static methods in the Workflow class + Mockito.mockStatic(Workflow.class); + Mockito.mockStatic(Async.class); + Mockito.mockStatic(Promise.class); + this.mockWorkload = Mockito.mock(Workload.class); + this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class); + this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class); + this.mockPromise = Mockito.mock(Promise.class); + + workflow = new AbstractNestingExecWorkflowImpl() { + @Override + protected Promise launchAsyncActivity(String task) { + return mockPromise; + } + }; + } + + @Test + public void testPerformWorkload_NoWorkSpan() { + // Arrange + Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty()); + + // Act + int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + + // Assert + Assert.assertEquals(0, result); + Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5); + } + + @Test + public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() { + // Act + Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50); + + // Assert + Assert.assertEquals(Duration.ZERO, result); + } + + @Test + public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() { + // Act + Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150); + + // Assert + Assert.assertEquals( + Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT), + result); + } + + @Test + public void testConsolidateSubTreeGrandChildren() { + // Act + List result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2); + + // Assert + Assert.assertEquals(3, result.size()); + Assert.assertEquals(Integer.valueOf(0), result.get(0)); + Assert.assertEquals(Integer.valueOf(0), result.get(1)); + Assert.assertEquals(Integer.valueOf(6), result.get(2)); + } + + @Test(expectedExceptions = AssertionError.class) + public void testPerformWorkload_LaunchesChildWorkflows() { + // Arrange + Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan)); + Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5); + Mockito.when(mockWorkSpan.next()).thenReturn("task1"); + Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false); + + // Mock the child workflow + Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), + Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise); + Mockito.when(mockPromise.get()).thenReturn(5); + // Act + int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + } +} From 949df4a85268b18a2637a151f8ac646c21b87924 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Mon, 7 Oct 2024 20:25:48 +0530 Subject: [PATCH 2/5] integrated with docker to prevent mocking temporal workflows --- .github/workflows/build_and_test.yaml | 4 + .../workflows/docker_compose_temporal.yaml | 81 +++++++ docker-compose | 1 + docker-hadoop | 1 + gobblin-temporal/build.gradle | 3 - .../src/test/files/SampleFile.txt | 1 + .../temporal/ddm/utils/JobStateUtilTest.java | 141 ----------- .../AbstractNestingExecWorkflowImplTest.java | 219 ++++++++++-------- 8 files changed, 205 insertions(+), 246 deletions(-) create mode 100644 .github/workflows/docker_compose_temporal.yaml create mode 160000 docker-compose create mode 160000 docker-hadoop create mode 100644 gobblin-temporal/src/test/files/SampleFile.txt delete mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 5149598ec15..e77cd3388e5 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -119,6 +119,10 @@ jobs: uses: actions/setup-java@v1 with: java-version: 1.8 + + - name: Set up Temporal Docker Compose + run: | + docker-compose -f .github/workflows/docker_compose_temporal.yaml up -d # Fix for bug where Github tests are failing port address binding. - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file run: | diff --git a/.github/workflows/docker_compose_temporal.yaml b/.github/workflows/docker_compose_temporal.yaml new file mode 100644 index 00000000000..e2906e3d2c4 --- /dev/null +++ b/.github/workflows/docker_compose_temporal.yaml @@ -0,0 +1,81 @@ +version: "3.5" +services: + elasticsearch: + container_name: temporal-elasticsearch + environment: + - cluster.routing.allocation.disk.threshold_enabled=true + - cluster.routing.allocation.disk.watermark.low=512mb + - cluster.routing.allocation.disk.watermark.high=256mb + - cluster.routing.allocation.disk.watermark.flood_stage=128mb + - discovery.type=single-node + - ES_JAVA_OPTS=-Xms256m -Xmx256m + - xpack.security.enabled=false + image: elasticsearch:${ELASTICSEARCH_VERSION} + networks: + - temporal-network + expose: + - 9200 + volumes: + - /var/lib/elasticsearch/data + postgresql: + container_name: temporal-postgresql + environment: + POSTGRES_PASSWORD: temporal + POSTGRES_USER: temporal + image: postgres:${POSTGRESQL_VERSION} + networks: + - temporal-network + expose: + - 5432 + volumes: + - /var/lib/postgresql/data + temporal: + container_name: temporal + depends_on: + - postgresql + - elasticsearch + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=postgresql + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + - ENABLE_ES=true + - ES_SEEDS=elasticsearch + - ES_VERSION=v7 + image: temporalio/auto-setup:${TEMPORAL_VERSION} + networks: + - temporal-network + ports: + - 7233:7233 + volumes: + - ./dynamicconfig:/etc/temporal/config/dynamicconfig + temporal-admin-tools: + container_name: temporal-admin-tools + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CLI_ADDRESS=temporal:7233 + image: temporalio/admin-tools:${TEMPORAL_ADMINTOOLS_VERSION} + networks: + - temporal-network + stdin_open: true + tty: true + temporal-ui: + container_name: temporal-ui + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:3000 + image: temporalio/ui:${TEMPORAL_UI_VERSION} + networks: + - temporal-network + ports: + - 8080:8080 +networks: + temporal-network: + driver: bridge + name: temporal-network diff --git a/docker-compose b/docker-compose new file mode 160000 index 00000000000..d28eb97ac59 --- /dev/null +++ b/docker-compose @@ -0,0 +1 @@ +Subproject commit d28eb97ac59b572e78a92bea8d371b2be2a46a0f diff --git a/docker-hadoop b/docker-hadoop new file mode 160000 index 00000000000..8414e2b0512 --- /dev/null +++ b/docker-hadoop @@ -0,0 +1 @@ +Subproject commit 8414e2b05122f85392cce0d27bf3978ed056696e diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 5e67f156030..d13c9d1d074 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -67,9 +67,6 @@ dependencies { testCompile project(":gobblin-example") testCompile externalDependency.testng - testCompile externalDependency.mockitoInline - testCompile externalDependency.powerMockApi - testCompile externalDependency.powerMockModule testCompile externalDependency.hadoopYarnMiniCluster testCompile externalDependency.curatorFramework testCompile externalDependency.curatorTest diff --git a/gobblin-temporal/src/test/files/SampleFile.txt b/gobblin-temporal/src/test/files/SampleFile.txt new file mode 100644 index 00000000000..3f79730e3ac --- /dev/null +++ b/gobblin-temporal/src/test/files/SampleFile.txt @@ -0,0 +1 @@ +Sample File \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java deleted file mode 100644 index 62e17bcea40..00000000000 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.temporal.ddm.utils; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Properties; - -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.TaskState; -import org.apache.gobblin.source.Source; -import org.apache.gobblin.metastore.StateStore; -import org.apache.gobblin.temporal.ddm.util.JobStateUtils; - - -public class JobStateUtilTest { - - private JobState jobState; - private FileSystem fileSystem; - - @BeforeMethod - public void setUp() { - jobState = Mockito.mock(JobState.class); - fileSystem = Mockito.mock(FileSystem.class); - } - - @Test - public void testOpenFileSystem() throws IOException { - - Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); - Mockito.when(jobState.getProperties()).thenReturn(new Properties()); - - FileSystem fs = JobStateUtils.openFileSystem(jobState); - - Assert.assertNotNull(fs); - Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString()); - } - - @Test - public void testCreateSource() throws ReflectiveOperationException { - Mockito.when(jobState.getProp(Mockito.anyString())) - .thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource"); - Source source = JobStateUtils.createSource(jobState); - Assert.assertNotNull(source); - } - - @Test - public void testOpenTaskStateStoreUncached() throws URISyntaxException { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test"); - Mockito.when(jobState.getJobId()).thenReturn("testJobId"); - Mockito.when(jobState.getJobName()).thenReturn("testJobName"); - Mockito.when(fileSystem.makeQualified(Mockito.any())) - .thenReturn(new Path("file:///test/testJobName/testJobId/output")); - Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output")); - - StateStore stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem); - - Assert.assertNotNull(stateStore); - } - - @Test - public void testGetFileSystemUri() { - Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); - URI fsUri = JobStateUtils.getFileSystemUri(jobState); - Assert.assertEquals(URI.create("file:///test"), fsUri); - Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString()); - } - - @Test - public void testGetWorkDirRoot() { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path rootPath = JobStateUtils.getWorkDirRoot(jobState); - Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath); - Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString()); - } - - @Test - public void testGetWorkUnitsPath() { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState); - Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath); - } - - @Test - public void testGetTaskStateStorePath() throws IOException { - Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path")); - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem); - Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath); - } - - @Test - public void testWriteJobState() throws IOException { - Path workDirRootPath = new Path("/tmp"); - FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class); - Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos); - - JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem); - - Mockito.verify(fileSystem).create(Mockito.any(Path.class)); - Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean()); - } - - @Test - public void testGetSharedResourcesBroker() { - Mockito.when(jobState.getProperties()).thenReturn(System.getProperties()); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState)); - } -} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java index 4d0ae66f0be..2ccf49e00b2 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java @@ -1,139 +1,154 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.gobblin.temporal.ddm.workflow.impl; -import java.time.Duration; -import java.util.List; import java.util.Optional; +import java.util.UUID; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import io.temporal.workflow.Async; -import io.temporal.workflow.Promise; -import io.temporal.workflow.Workflow; - +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryReverseRequest; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryReverseResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.api.enums.v1.EventType; + + +import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; +import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload; +import org.apache.gobblin.temporal.loadgen.workflow.impl.NestingExecOfIllustrationItemActivityWorkflowImpl; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; -import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; +import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Workflow.class) public class AbstractNestingExecWorkflowImplTest { - @Mock - private Workload mockWorkload; + private static final Logger logger = LoggerFactory.getLogger(AbstractNestingExecWorkflowImplTest.class); + + private WorkflowServiceStubs service; + private WorkflowClient client; + private WorkerFactory factory; + private Worker worker; + private final String TEMPORAL_TASK_QUEUE = "test-task-queue"; + + @BeforeMethod + public void setUp() throws Exception { + // Connect to the Temporal service running in Docker + service = WorkflowServiceStubs.newInstance(); + client = WorkflowClient.newInstance(service); + factory = WorkerFactory.newInstance(client); + worker = factory.newWorker(TEMPORAL_TASK_QUEUE); + worker.registerWorkflowImplementationTypes(NestingExecOfIllustrationItemActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new MockHandleItemActivityImpl()); + factory.start(); + } - @Mock - private WorkflowAddr mockWorkflowAddr; + @AfterMethod + public void tearDown() { + factory.shutdown(); + service.shutdown(); + } - @Mock - private Workload.WorkSpan mockWorkSpan; + @Test + public void testPerformWorkloadWithEmptyWorkload() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - @Mock - private Promise mockPromise; + Workload workload = SimpleGeneratedWorkload.createAs(0); - private AbstractNestingExecWorkflowImpl workflow; + int result = workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(0, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); + } - @BeforeClass - public void setup() { - // PowerMockito is required to mock static methods in the Workflow class - Mockito.mockStatic(Workflow.class); - Mockito.mockStatic(Async.class); - Mockito.mockStatic(Promise.class); - this.mockWorkload = Mockito.mock(Workload.class); - this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class); - this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class); - this.mockPromise = Mockito.mock(Promise.class); + @Test + public void testPerformWorkloadWithPartialSpan() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - workflow = new AbstractNestingExecWorkflowImpl() { - @Override - protected Promise launchAsyncActivity(String task) { - return mockPromise; - } - }; + Workload workload = SimpleGeneratedWorkload.createAs(500); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(500, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testPerformWorkload_NoWorkSpan() { - // Arrange - Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty()); + public void testPerformWorkloadWithTwoLevelsOfNesting() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - // Act - int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + Workload workload = SimpleGeneratedWorkload.createAs(2048); - // Assert - Assert.assertEquals(0, result); - Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5); + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(2048, result); + Assert.assertEquals(2, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() { - // Act - Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50); + public void testPerformWorkloadWithMaxBranchesExceeded_WithTreeDepth_1() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); + + Workload workload = SimpleGeneratedWorkload.createAs(1024); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 1025, 30, Optional.empty()); - // Assert - Assert.assertEquals(Duration.ZERO, result); + logger.info("PerformWorkload method returned"); + Assert.assertEquals(1024, result); + Assert.assertEquals(1, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() { - // Act - Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150); - - // Assert - Assert.assertEquals( - Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT), - result); + public void testPerformWorkloadWithMaxSubTreesOverride_WithTreeDepth_0() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); + + logger.info("Calling performWorkload method on workflow with max sub-trees override"); + Workload workload = SimpleGeneratedWorkload.createAs(1024); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 1024, 30, Optional.of(0)); + Assert.assertEquals(1024, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); } - @Test - public void testConsolidateSubTreeGrandChildren() { - // Act - List result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2); - - // Assert - Assert.assertEquals(3, result.size()); - Assert.assertEquals(Integer.valueOf(0), result.get(0)); - Assert.assertEquals(Integer.valueOf(0), result.get(1)); - Assert.assertEquals(Integer.valueOf(6), result.get(2)); + private int getDepthLevelOfWorkFlowNesting(String workFlowId) { + final GetWorkflowExecutionHistoryReverseResponse workflowExecutionHistoryResponse = client.getWorkflowServiceStubs() + .blockingStub() + .getWorkflowExecutionHistoryReverse(GetWorkflowExecutionHistoryReverseRequest.newBuilder() + .setNamespace("default") + .setExecution(WorkflowExecution.newBuilder().setWorkflowId(workFlowId).build()) + .build()); + int depth = 0; + final History history = workflowExecutionHistoryResponse.getHistory(); + for (HistoryEvent historyEvent : history.getEventsList()) { + if (historyEvent.getEventType().equals(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)) { + depth++; + } + } + return depth; } - @Test(expectedExceptions = AssertionError.class) - public void testPerformWorkload_LaunchesChildWorkflows() { - // Arrange - Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan)); - Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5); - Mockito.when(mockWorkSpan.next()).thenReturn("task1"); - Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false); - - // Mock the child workflow - Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), - Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise); - Mockito.when(mockPromise.get()).thenReturn(5); - // Act - int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + private class MockHandleItemActivityImpl implements IllustrationItemActivity { + @Override + public String handleItem(IllustrationItem item) { + return null; + } } } From f34caa9be7443ceaddda84adc5da7c6e7f8ef155 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Mon, 7 Oct 2024 20:25:48 +0530 Subject: [PATCH 3/5] integrated with docker to prevent mocking temporal workflows --- .github/workflows/build_and_test.yaml | 4 + .../workflows/docker_compose_temporal.yaml | 81 +++++++ gobblin-temporal/build.gradle | 5 +- .../temporal/ddm/utils/JobStateUtilTest.java | 141 ----------- .../AbstractNestingExecWorkflowImplTest.java | 219 ++++++++++-------- 5 files changed, 203 insertions(+), 247 deletions(-) create mode 100644 .github/workflows/docker_compose_temporal.yaml delete mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 5149598ec15..e77cd3388e5 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -119,6 +119,10 @@ jobs: uses: actions/setup-java@v1 with: java-version: 1.8 + + - name: Set up Temporal Docker Compose + run: | + docker-compose -f .github/workflows/docker_compose_temporal.yaml up -d # Fix for bug where Github tests are failing port address binding. - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file run: | diff --git a/.github/workflows/docker_compose_temporal.yaml b/.github/workflows/docker_compose_temporal.yaml new file mode 100644 index 00000000000..e2906e3d2c4 --- /dev/null +++ b/.github/workflows/docker_compose_temporal.yaml @@ -0,0 +1,81 @@ +version: "3.5" +services: + elasticsearch: + container_name: temporal-elasticsearch + environment: + - cluster.routing.allocation.disk.threshold_enabled=true + - cluster.routing.allocation.disk.watermark.low=512mb + - cluster.routing.allocation.disk.watermark.high=256mb + - cluster.routing.allocation.disk.watermark.flood_stage=128mb + - discovery.type=single-node + - ES_JAVA_OPTS=-Xms256m -Xmx256m + - xpack.security.enabled=false + image: elasticsearch:${ELASTICSEARCH_VERSION} + networks: + - temporal-network + expose: + - 9200 + volumes: + - /var/lib/elasticsearch/data + postgresql: + container_name: temporal-postgresql + environment: + POSTGRES_PASSWORD: temporal + POSTGRES_USER: temporal + image: postgres:${POSTGRESQL_VERSION} + networks: + - temporal-network + expose: + - 5432 + volumes: + - /var/lib/postgresql/data + temporal: + container_name: temporal + depends_on: + - postgresql + - elasticsearch + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=postgresql + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + - ENABLE_ES=true + - ES_SEEDS=elasticsearch + - ES_VERSION=v7 + image: temporalio/auto-setup:${TEMPORAL_VERSION} + networks: + - temporal-network + ports: + - 7233:7233 + volumes: + - ./dynamicconfig:/etc/temporal/config/dynamicconfig + temporal-admin-tools: + container_name: temporal-admin-tools + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CLI_ADDRESS=temporal:7233 + image: temporalio/admin-tools:${TEMPORAL_ADMINTOOLS_VERSION} + networks: + - temporal-network + stdin_open: true + tty: true + temporal-ui: + container_name: temporal-ui + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:3000 + image: temporalio/ui:${TEMPORAL_UI_VERSION} + networks: + - temporal-network + ports: + - 8080:8080 +networks: + temporal-network: + driver: bridge + name: temporal-network diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 5e67f156030..1832ac909de 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -67,14 +67,11 @@ dependencies { testCompile project(":gobblin-example") testCompile externalDependency.testng - testCompile externalDependency.mockitoInline - testCompile externalDependency.powerMockApi - testCompile externalDependency.powerMockModule + testCompile externalDependency.mockito testCompile externalDependency.hadoopYarnMiniCluster testCompile externalDependency.curatorFramework testCompile externalDependency.curatorTest - testCompile ('com.google.inject:guice:3.0') { force = true } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java deleted file mode 100644 index 62e17bcea40..00000000000 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.temporal.ddm.utils; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Properties; - -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.TaskState; -import org.apache.gobblin.source.Source; -import org.apache.gobblin.metastore.StateStore; -import org.apache.gobblin.temporal.ddm.util.JobStateUtils; - - -public class JobStateUtilTest { - - private JobState jobState; - private FileSystem fileSystem; - - @BeforeMethod - public void setUp() { - jobState = Mockito.mock(JobState.class); - fileSystem = Mockito.mock(FileSystem.class); - } - - @Test - public void testOpenFileSystem() throws IOException { - - Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); - Mockito.when(jobState.getProperties()).thenReturn(new Properties()); - - FileSystem fs = JobStateUtils.openFileSystem(jobState); - - Assert.assertNotNull(fs); - Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString()); - } - - @Test - public void testCreateSource() throws ReflectiveOperationException { - Mockito.when(jobState.getProp(Mockito.anyString())) - .thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource"); - Source source = JobStateUtils.createSource(jobState); - Assert.assertNotNull(source); - } - - @Test - public void testOpenTaskStateStoreUncached() throws URISyntaxException { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test"); - Mockito.when(jobState.getJobId()).thenReturn("testJobId"); - Mockito.when(jobState.getJobName()).thenReturn("testJobName"); - Mockito.when(fileSystem.makeQualified(Mockito.any())) - .thenReturn(new Path("file:///test/testJobName/testJobId/output")); - Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output")); - - StateStore stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem); - - Assert.assertNotNull(stateStore); - } - - @Test - public void testGetFileSystemUri() { - Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); - URI fsUri = JobStateUtils.getFileSystemUri(jobState); - Assert.assertEquals(URI.create("file:///test"), fsUri); - Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString()); - } - - @Test - public void testGetWorkDirRoot() { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path rootPath = JobStateUtils.getWorkDirRoot(jobState); - Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath); - Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString()); - } - - @Test - public void testGetWorkUnitsPath() { - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState); - Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath); - } - - @Test - public void testGetTaskStateStorePath() throws IOException { - Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path")); - Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem); - Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath); - } - - @Test - public void testWriteJobState() throws IOException { - Path workDirRootPath = new Path("/tmp"); - FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class); - Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos); - - JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem); - - Mockito.verify(fileSystem).create(Mockito.any(Path.class)); - Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean()); - } - - @Test - public void testGetSharedResourcesBroker() { - Mockito.when(jobState.getProperties()).thenReturn(System.getProperties()); - Mockito.when(jobState.getJobName()).thenReturn("testJob"); - Mockito.when(jobState.getJobId()).thenReturn("jobId123"); - Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState)); - } -} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java index 4d0ae66f0be..2ccf49e00b2 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java @@ -1,139 +1,154 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.gobblin.temporal.ddm.workflow.impl; -import java.time.Duration; -import java.util.List; import java.util.Optional; +import java.util.UUID; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import io.temporal.workflow.Async; -import io.temporal.workflow.Promise; -import io.temporal.workflow.Workflow; - +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryReverseRequest; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryReverseResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.api.enums.v1.EventType; + + +import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity; +import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; +import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload; +import org.apache.gobblin.temporal.loadgen.workflow.impl.NestingExecOfIllustrationItemActivityWorkflowImpl; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; -import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; +import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Workflow.class) public class AbstractNestingExecWorkflowImplTest { - @Mock - private Workload mockWorkload; + private static final Logger logger = LoggerFactory.getLogger(AbstractNestingExecWorkflowImplTest.class); + + private WorkflowServiceStubs service; + private WorkflowClient client; + private WorkerFactory factory; + private Worker worker; + private final String TEMPORAL_TASK_QUEUE = "test-task-queue"; + + @BeforeMethod + public void setUp() throws Exception { + // Connect to the Temporal service running in Docker + service = WorkflowServiceStubs.newInstance(); + client = WorkflowClient.newInstance(service); + factory = WorkerFactory.newInstance(client); + worker = factory.newWorker(TEMPORAL_TASK_QUEUE); + worker.registerWorkflowImplementationTypes(NestingExecOfIllustrationItemActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new MockHandleItemActivityImpl()); + factory.start(); + } - @Mock - private WorkflowAddr mockWorkflowAddr; + @AfterMethod + public void tearDown() { + factory.shutdown(); + service.shutdown(); + } - @Mock - private Workload.WorkSpan mockWorkSpan; + @Test + public void testPerformWorkloadWithEmptyWorkload() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - @Mock - private Promise mockPromise; + Workload workload = SimpleGeneratedWorkload.createAs(0); - private AbstractNestingExecWorkflowImpl workflow; + int result = workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(0, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); + } - @BeforeClass - public void setup() { - // PowerMockito is required to mock static methods in the Workflow class - Mockito.mockStatic(Workflow.class); - Mockito.mockStatic(Async.class); - Mockito.mockStatic(Promise.class); - this.mockWorkload = Mockito.mock(Workload.class); - this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class); - this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class); - this.mockPromise = Mockito.mock(Promise.class); + @Test + public void testPerformWorkloadWithPartialSpan() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - workflow = new AbstractNestingExecWorkflowImpl() { - @Override - protected Promise launchAsyncActivity(String task) { - return mockPromise; - } - }; + Workload workload = SimpleGeneratedWorkload.createAs(500); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(500, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testPerformWorkload_NoWorkSpan() { - // Arrange - Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty()); + public void testPerformWorkloadWithTwoLevelsOfNesting() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); - // Act - int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + Workload workload = SimpleGeneratedWorkload.createAs(2048); - // Assert - Assert.assertEquals(0, result); - Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5); + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 900, 30, Optional.empty()); + Assert.assertEquals(2048, result); + Assert.assertEquals(2, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() { - // Act - Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50); + public void testPerformWorkloadWithMaxBranchesExceeded_WithTreeDepth_1() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); + + Workload workload = SimpleGeneratedWorkload.createAs(1024); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 1025, 30, Optional.empty()); - // Assert - Assert.assertEquals(Duration.ZERO, result); + logger.info("PerformWorkload method returned"); + Assert.assertEquals(1024, result); + Assert.assertEquals(1, getDepthLevelOfWorkFlowNesting(workFlowId)); } @Test - public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() { - // Act - Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150); - - // Assert - Assert.assertEquals( - Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT), - result); + public void testPerformWorkloadWithMaxSubTreesOverride_WithTreeDepth_0() { + final String workFlowId = UUID.randomUUID().toString(); + final NestingExecWorkflow workflow = client.newWorkflowStub(NestingExecWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TEMPORAL_TASK_QUEUE).setWorkflowId(workFlowId).build()); + + logger.info("Calling performWorkload method on workflow with max sub-trees override"); + Workload workload = SimpleGeneratedWorkload.createAs(1024); + + int result = workflow.performWorkload(new WorkflowAddr(0), workload, 0, 1024, 30, Optional.of(0)); + Assert.assertEquals(1024, result); + Assert.assertEquals(0, getDepthLevelOfWorkFlowNesting(workFlowId)); } - @Test - public void testConsolidateSubTreeGrandChildren() { - // Act - List result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2); - - // Assert - Assert.assertEquals(3, result.size()); - Assert.assertEquals(Integer.valueOf(0), result.get(0)); - Assert.assertEquals(Integer.valueOf(0), result.get(1)); - Assert.assertEquals(Integer.valueOf(6), result.get(2)); + private int getDepthLevelOfWorkFlowNesting(String workFlowId) { + final GetWorkflowExecutionHistoryReverseResponse workflowExecutionHistoryResponse = client.getWorkflowServiceStubs() + .blockingStub() + .getWorkflowExecutionHistoryReverse(GetWorkflowExecutionHistoryReverseRequest.newBuilder() + .setNamespace("default") + .setExecution(WorkflowExecution.newBuilder().setWorkflowId(workFlowId).build()) + .build()); + int depth = 0; + final History history = workflowExecutionHistoryResponse.getHistory(); + for (HistoryEvent historyEvent : history.getEventsList()) { + if (historyEvent.getEventType().equals(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)) { + depth++; + } + } + return depth; } - @Test(expectedExceptions = AssertionError.class) - public void testPerformWorkload_LaunchesChildWorkflows() { - // Arrange - Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan)); - Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5); - Mockito.when(mockWorkSpan.next()).thenReturn("task1"); - Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false); - - // Mock the child workflow - Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), - Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise); - Mockito.when(mockPromise.get()).thenReturn(5); - // Act - int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + private class MockHandleItemActivityImpl implements IllustrationItemActivity { + @Override + public String handleItem(IllustrationItem item) { + return null; + } } } From 4cd7e089f8a98741508ee70ff41227d007241f17 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Tue, 8 Oct 2024 09:51:01 +0530 Subject: [PATCH 4/5] removed unnecessary files --- .github/workflows/build_and_test.yaml | 83 ++++++++++++++++++- docker-compose | 1 - docker-hadoop | 1 - .../src/test/files/SampleFile.txt | 1 - 4 files changed, 79 insertions(+), 7 deletions(-) delete mode 160000 docker-compose delete mode 160000 docker-hadoop delete mode 100644 gobblin-temporal/src/test/files/SampleFile.txt diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index e77cd3388e5..96ab47eeda9 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -110,6 +110,85 @@ jobs: ports: - 3306:3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=5 + elasticsearch: + container_name: temporal-elasticsearch + environment: + - cluster.routing.allocation.disk.threshold_enabled=true + - cluster.routing.allocation.disk.watermark.low=512mb + - cluster.routing.allocation.disk.watermark.high=256mb + - cluster.routing.allocation.disk.watermark.flood_stage=128mb + - discovery.type=single-node + - ES_JAVA_OPTS=-Xms256m -Xmx256m + - xpack.security.enabled=false + image: elasticsearch:${ELASTICSEARCH_VERSION} + networks: + - temporal-network + expose: + - 9200 + volumes: + - /var/lib/elasticsearch/data + postgresql: + container_name: temporal-postgresql + environment: + POSTGRES_PASSWORD: temporal + POSTGRES_USER: temporal + image: postgres:${POSTGRESQL_VERSION} + networks: + - temporal-network + expose: + - 5432 + volumes: + - /var/lib/postgresql/data + temporal: + container_name: temporal + depends_on: + - postgresql + - elasticsearch + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=postgresql + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + - ENABLE_ES=true + - ES_SEEDS=elasticsearch + - ES_VERSION=v7 + image: temporalio/auto-setup:${TEMPORAL_VERSION} + networks: + - temporal-network + ports: + - 7233:7233 + volumes: + - ./dynamicconfig:/etc/temporal/config/dynamicconfig + temporal-admin-tools: + container_name: temporal-admin-tools + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CLI_ADDRESS=temporal:7233 + image: temporalio/admin-tools:${TEMPORAL_ADMINTOOLS_VERSION} + networks: + - temporal-network + stdin_open: true + tty: true + temporal-ui: + container_name: temporal-ui + depends_on: + - temporal + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://localhost:3000 + image: temporalio/ui:${TEMPORAL_UI_VERSION} + networks: + - temporal-network + ports: + - 8080:8080 + networks: + temporal-network: + driver: bridge + name: temporal-network steps: - name: Check out the repo uses: actions/checkout@v2 @@ -119,10 +198,6 @@ jobs: uses: actions/setup-java@v1 with: java-version: 1.8 - - - name: Set up Temporal Docker Compose - run: | - docker-compose -f .github/workflows/docker_compose_temporal.yaml up -d # Fix for bug where Github tests are failing port address binding. - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file run: | diff --git a/docker-compose b/docker-compose deleted file mode 160000 index d28eb97ac59..00000000000 --- a/docker-compose +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d28eb97ac59b572e78a92bea8d371b2be2a46a0f diff --git a/docker-hadoop b/docker-hadoop deleted file mode 160000 index 8414e2b0512..00000000000 --- a/docker-hadoop +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8414e2b05122f85392cce0d27bf3978ed056696e diff --git a/gobblin-temporal/src/test/files/SampleFile.txt b/gobblin-temporal/src/test/files/SampleFile.txt deleted file mode 100644 index 3f79730e3ac..00000000000 --- a/gobblin-temporal/src/test/files/SampleFile.txt +++ /dev/null @@ -1 +0,0 @@ -Sample File \ No newline at end of file From 857a70f86fa91cd58094295ef1f8d2d410c92c13 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Tue, 8 Oct 2024 12:08:13 +0530 Subject: [PATCH 5/5] added docker compose for temporal --- .github/workflows/build_and_test.yaml | 111 ++++-------------- .../workflows/docker_compose_temporal.yaml | 81 ------------- .../AbstractNestingExecWorkflowImplTest.java | 16 +++ 3 files changed, 36 insertions(+), 172 deletions(-) delete mode 100644 .github/workflows/docker_compose_temporal.yaml diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 96ab47eeda9..ed5db8f9282 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -110,85 +110,6 @@ jobs: ports: - 3306:3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=5 - elasticsearch: - container_name: temporal-elasticsearch - environment: - - cluster.routing.allocation.disk.threshold_enabled=true - - cluster.routing.allocation.disk.watermark.low=512mb - - cluster.routing.allocation.disk.watermark.high=256mb - - cluster.routing.allocation.disk.watermark.flood_stage=128mb - - discovery.type=single-node - - ES_JAVA_OPTS=-Xms256m -Xmx256m - - xpack.security.enabled=false - image: elasticsearch:${ELASTICSEARCH_VERSION} - networks: - - temporal-network - expose: - - 9200 - volumes: - - /var/lib/elasticsearch/data - postgresql: - container_name: temporal-postgresql - environment: - POSTGRES_PASSWORD: temporal - POSTGRES_USER: temporal - image: postgres:${POSTGRESQL_VERSION} - networks: - - temporal-network - expose: - - 5432 - volumes: - - /var/lib/postgresql/data - temporal: - container_name: temporal - depends_on: - - postgresql - - elasticsearch - environment: - - DB=postgres12 - - DB_PORT=5432 - - POSTGRES_USER=temporal - - POSTGRES_PWD=temporal - - POSTGRES_SEEDS=postgresql - - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml - - ENABLE_ES=true - - ES_SEEDS=elasticsearch - - ES_VERSION=v7 - image: temporalio/auto-setup:${TEMPORAL_VERSION} - networks: - - temporal-network - ports: - - 7233:7233 - volumes: - - ./dynamicconfig:/etc/temporal/config/dynamicconfig - temporal-admin-tools: - container_name: temporal-admin-tools - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CLI_ADDRESS=temporal:7233 - image: temporalio/admin-tools:${TEMPORAL_ADMINTOOLS_VERSION} - networks: - - temporal-network - stdin_open: true - tty: true - temporal-ui: - container_name: temporal-ui - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CORS_ORIGINS=http://localhost:3000 - image: temporalio/ui:${TEMPORAL_UI_VERSION} - networks: - - temporal-network - ports: - - 8080:8080 - networks: - temporal-network: - driver: bridge - name: temporal-network steps: - name: Check out the repo uses: actions/checkout@v2 @@ -198,23 +119,31 @@ jobs: uses: actions/setup-java@v1 with: java-version: 1.8 + + - name: Build Temporal Docker + id: temporal_build + run: | + git clone https://github.com/temporalio/docker-compose.git + cd docker-compose + docker compose up -d + # Fix for bug where Github tests are failing port address binding. - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file run: | echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts - name: Verify mysql connection run: | - sudo apt-get clean - sudo apt-get --fix-missing update - sudo apt-get -f install -o Dpkg::Options::="--force-overwrite" - sudo apt-get purge mysql\* - sudo rm -rf /var/lib/mysql - sudo rm -rf /etc/mysql - sudo dpkg -l | grep -i mysql - sudo apt-get clean - sudo apt-get install -y mysql-client - mysql --host 127.0.0.1 --port 3306 -uroot -ptestPassword -e "SHOW DATABASES" - mysql --host 127.0.0.1 --port 3306 -uroot -ptestPassword -e "SET GLOBAL max_connections=2000" + sudo apt-get clean + sudo apt-get --fix-missing update + sudo apt-get -f install -o Dpkg::Options::="--force-overwrite" + sudo apt-get purge mysql\* + sudo rm -rf /var/lib/mysql + sudo rm -rf /etc/mysql + sudo dpkg -l | grep -i mysql + sudo apt-get clean + sudo apt-get install -y mysql-client + mysql --host 127.0.0.1 --port 3306 -uroot -ptestPassword -e "SHOW DATABASES" + mysql --host 127.0.0.1 --port 3306 -uroot -ptestPassword -e "SET GLOBAL max_connections=2000" - name: Cache Gradle Dependencies uses: actions/cache@v2 with: @@ -253,4 +182,4 @@ jobs: with: files: ${{ env.jacoco_reports }} fail_ci_if_error: false - verbose: true + verbose: true \ No newline at end of file diff --git a/.github/workflows/docker_compose_temporal.yaml b/.github/workflows/docker_compose_temporal.yaml deleted file mode 100644 index e2906e3d2c4..00000000000 --- a/.github/workflows/docker_compose_temporal.yaml +++ /dev/null @@ -1,81 +0,0 @@ -version: "3.5" -services: - elasticsearch: - container_name: temporal-elasticsearch - environment: - - cluster.routing.allocation.disk.threshold_enabled=true - - cluster.routing.allocation.disk.watermark.low=512mb - - cluster.routing.allocation.disk.watermark.high=256mb - - cluster.routing.allocation.disk.watermark.flood_stage=128mb - - discovery.type=single-node - - ES_JAVA_OPTS=-Xms256m -Xmx256m - - xpack.security.enabled=false - image: elasticsearch:${ELASTICSEARCH_VERSION} - networks: - - temporal-network - expose: - - 9200 - volumes: - - /var/lib/elasticsearch/data - postgresql: - container_name: temporal-postgresql - environment: - POSTGRES_PASSWORD: temporal - POSTGRES_USER: temporal - image: postgres:${POSTGRESQL_VERSION} - networks: - - temporal-network - expose: - - 5432 - volumes: - - /var/lib/postgresql/data - temporal: - container_name: temporal - depends_on: - - postgresql - - elasticsearch - environment: - - DB=postgres12 - - DB_PORT=5432 - - POSTGRES_USER=temporal - - POSTGRES_PWD=temporal - - POSTGRES_SEEDS=postgresql - - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml - - ENABLE_ES=true - - ES_SEEDS=elasticsearch - - ES_VERSION=v7 - image: temporalio/auto-setup:${TEMPORAL_VERSION} - networks: - - temporal-network - ports: - - 7233:7233 - volumes: - - ./dynamicconfig:/etc/temporal/config/dynamicconfig - temporal-admin-tools: - container_name: temporal-admin-tools - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CLI_ADDRESS=temporal:7233 - image: temporalio/admin-tools:${TEMPORAL_ADMINTOOLS_VERSION} - networks: - - temporal-network - stdin_open: true - tty: true - temporal-ui: - container_name: temporal-ui - depends_on: - - temporal - environment: - - TEMPORAL_ADDRESS=temporal:7233 - - TEMPORAL_CORS_ORIGINS=http://localhost:3000 - image: temporalio/ui:${TEMPORAL_UI_VERSION} - networks: - - temporal-network - ports: - - 8080:8080 -networks: - temporal-network: - driver: bridge - name: temporal-network diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java index 2ccf49e00b2..d5e853c0452 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.gobblin.temporal.ddm.workflow.impl; import java.util.Optional;