diff --git a/conf/yarn/application.conf b/conf/yarn/application.conf index b31a731ac8e..97661ebd888 100644 --- a/conf/yarn/application.conf +++ b/conf/yarn/application.conf @@ -94,4 +94,4 @@ admin.server.enabled=false # job history store ( WARN [GobblinYarnAppLauncher] NOT starting the admin UI because the job execution info server is NOT enabled ) job.execinfo.server.enabled=false -job.history.store.enabled=false +job.history.store.enabled=false \ No newline at end of file diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 0b95b78ef4c..524cc381226 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -18,6 +18,7 @@ package org.apache.gobblin.temporal; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.temporal.ddm.worker.ExecutionWorker; import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher; import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker; @@ -29,14 +30,20 @@ public interface GobblinTemporalConfigurationKeys { String PREFIX = "gobblin.temporal."; + String STAGE_SPECIFIC_PREFIX = PREFIX + "stage."; String WORKER_CLASS = PREFIX + "worker.class"; String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName(); + String EXECUTION_WORKER_CLASS = ExecutionWorker.class.getName(); String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name"; String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue"; + + // Execution task queue for work execution specialization + String EXECUTION_TASK_QUEUE = PREFIX + "execution.task.queue.name"; + String DEFAULT_EXECUTION_TASK_QUEUE = "GobblinTemporalExecutionQueue"; String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher."; String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class"; String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName(); @@ -136,4 +143,8 @@ public interface GobblinTemporalConfigurationKeys { String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts"; int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4; + /** + * Memory allocation for execution worker containers. + */ + String WORK_EXECUTION_MEMORY_MB = STAGE_SPECIFIC_PREFIX + "workExecution.memory.mb"; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java index 8ab428c4189..b569417f0b8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java @@ -33,16 +33,12 @@ /** Basic boilerplate for a {@link TemporalWorker} to register its activity and workflow capabilities and listen on a particular queue */ public abstract class AbstractTemporalWorker implements TemporalWorker { private final WorkflowClient workflowClient; - private final String queueName; private final WorkerFactory workerFactory; - private final Config config; + protected final Config config; public AbstractTemporalWorker(Config cfg, WorkflowClient client) { config = cfg; workflowClient = client; - queueName = ConfigUtils.getString(cfg, - GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, - GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); // Create a Worker factory that can be used to create Workers that poll specific Task Queues. workerFactory = WorkerFactory.newInstance(workflowClient); @@ -52,7 +48,7 @@ public AbstractTemporalWorker(Config cfg, WorkflowClient client) { @Override public void start() { - Worker worker = workerFactory.newWorker(queueName, createWorkerOptions()); + Worker worker = workerFactory.newWorker(getTaskQueue(), createWorkerOptions()); // This Worker hosts both Workflow and Activity implementations. // Workflows are stateful, so you need to supply a type to create instances. worker.registerWorkflowImplementationTypes(getWorkflowImplClasses()); @@ -77,6 +73,12 @@ protected WorkerOptions createWorkerOptions() { /** @return activity instances; NOTE: activities must be stateless and thread-safe, so a shared instance is used. */ protected abstract Object[] getActivityImplInstances(); + protected String getTaskQueue() { + return ConfigUtils.getString(config, + GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); + } + private final void stashWorkerConfig(Config cfg) { // stash to associate with... WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java index e7cd4315f00..b7a609c51f3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java @@ -238,6 +238,7 @@ public void start() for (int i = 0; i < this.numTemporalWorkers; i++) { workers.add(initiateWorker()); } + initializeExecutionWorkers(); }catch (Exception e) { logger.info(e + " for initiate workers"); throw new RuntimeException(e); @@ -252,8 +253,8 @@ private TemporalWorker initiateWorker() throws Exception { WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); - String workerClassName = ConfigUtils.getString(clusterConfig, - GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); + String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, + GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); logger.info("Creating worker - class: '{}'", workerClassName); Config workerConfig = clusterConfig; TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor( @@ -263,6 +264,38 @@ private TemporalWorker initiateWorker() throws Exception { return worker; } + private void initializeExecutionWorkers() throws Exception { + boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig, + GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false); + + if (!dynamicScalingEnabled) { + return; + } + + String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, + GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); + boolean isExecutionWorkerContainer = GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName); + + // only the initial container (WorkFulfillment worker) should start an additional ExecutionWorker worker + if (isExecutionWorkerContainer) { + return; + } + + logger.info("Starting additional ExecutionWorker in initial container"); + + String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); + WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( + managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); + + TemporalWorker executionWorker = GobblinConstructorUtils.invokeLongestConstructor( + (Class)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS), + clusterConfig, client); + executionWorker.start(); + workers.add(executionWorker); + logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS); + } + private void initMetricReporter() { if (this.containerMetrics.isPresent()) { try { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java index a0d3fd11e55..aa86c6425c7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java @@ -25,13 +25,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.ddm.workflow.WorkflowStage; import org.apache.gobblin.temporal.dynamic.ProfileDerivation; import org.apache.gobblin.temporal.dynamic.ProfileOverlay; import org.apache.gobblin.temporal.dynamic.ScalingDirective; import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** @@ -62,8 +65,9 @@ public List recommendScaling(WorkUnitsSizeSummary remainingWor protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState); protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) { - // TODO: implement right-sizing!!! (for now just return unchanged) - return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged()); + // Create overlay with execution-specific memory and worker class + ProfileOverlay overlay = createExecutionWorkerOverlay(jobState); + return new ProfileDerivation(basisProfileName, overlay); } protected String calcProfileDerivationName(JobState jobState) { @@ -72,6 +76,28 @@ protected String calcProfileDerivationName(JobState jobState) { } protected String calcBasisProfileName(JobState jobState) { - return WorkforceProfiles.BASELINE_NAME; // always build upon baseline + // Always derive from the global baseline + return WorkforceProfiles.BASELINE_NAME; } + + private ProfileOverlay createExecutionWorkerOverlay(JobState jobState) { + List overlayPairs = new java.util.ArrayList<>(); + + // Add execution-specific memory if configured (overrides baseline memory) + if (jobState.contains(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)) { + overlayPairs.add(new ProfileOverlay.KVPair( + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, + jobState.getProp(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB) + )); + } + + // Add ExecutionWorker class to ensure correct task queue routing + overlayPairs.add(new ProfileOverlay.KVPair( + GobblinTemporalConfigurationKeys.WORKER_CLASS, + GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS + )); + + return new ProfileOverlay.Adding(overlayPairs); + } + } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java new file mode 100644 index 00000000000..1b36f76fef6 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.util.concurrent.TimeUnit; + +import com.typesafe.config.Config; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Specialized worker for Work Execution stage. + * This worker only registers activities for: + * - ProcessWorkUnit (Work Execution) + * + * Runs on containers with stage-specific memory for work execution operations. + * Polls the execution task queue to ensure activities run on appropriately-sized containers. + */ +public class ExecutionWorker extends AbstractTemporalWorker { + public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; + private final int maxExecutionConcurrency; + + public ExecutionWorker(Config config, WorkflowClient workflowClient) { + super(config, workflowClient); + this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER); + } + + @Override + protected Class[] getWorkflowImplClasses() { + return new Class[] { + ProcessWorkUnitsWorkflowImpl.class, + NestingExecOfProcessWorkUnitWorkflowImpl.class + }; + } + + @Override + protected Object[] getActivityImplInstances() { + return new Object[] { + new ProcessWorkUnitImpl() + }; + } + + @Override + protected WorkerOptions createWorkerOptions() { + return WorkerOptions.newBuilder() + .setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS)) + .setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency) + .setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency) + .setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency) + .build(); + } + + @Override + protected String getTaskQueue() { + return ConfigUtils.getString( + config, + GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, + GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE + ); + } + + /** + * Package-private for testing purposes. + */ + int getMaxExecutionConcurrency() { + return maxExecutionConcurrency; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStage.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStage.java new file mode 100644 index 00000000000..fcc17f1004d --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import com.typesafe.config.Config; +import lombok.Getter; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + +/** + * Represents the different stages of a Gobblin Temporal workflow. + * + *

Stages: + *

    + *
  • WORK_DISCOVERY: Discovers data sources, generates work units (uses default queue)
  • + *
  • WORK_EXECUTION: Processes work units to transform and load data (uses execution queue when dynamic scaling enabled)
  • + *
  • COMMIT: Commits work units (uses default queue)
  • + *
+ * + *

Queue routing: + *

    + *
  • Dynamic scaling OFF: All stages use default queue
  • + *
  • Dynamic scaling ON: WORK_EXECUTION uses dedicated execution queue, others use default queue
  • + *
+ */ +@Getter +public enum WorkflowStage { + WORK_DISCOVERY("workDiscovery", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE), + WORK_EXECUTION("workExecution", GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, + GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE), + COMMIT("commit", GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); + + private final String profileBaseName; + private final String taskQueueConfigKey; + private final String defaultTaskQueue; + + WorkflowStage(String profileBaseName, String taskQueueConfigKey, String defaultTaskQueue) { + this.profileBaseName = profileBaseName; + this.taskQueueConfigKey = taskQueueConfigKey; + this.defaultTaskQueue = defaultTaskQueue; + } + + /** + * Returns the task queue for this stage, reading from config or using default. + * Example: "GobblinTemporalDiscoveryCommitQueue", "GobblinTemporalExecutionQueue" + * + * @param config the configuration to read from + * @return the task queue name for this stage + */ + public String getTaskQueue(Config config) { + return config.hasPath(taskQueueConfigKey) + ? config.getString(taskQueueConfigKey) + : defaultTaskQueue; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 94e870c283b..3fb309a927c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import io.temporal.api.enums.v1.ParentClosePolicy; @@ -32,6 +33,7 @@ import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.temporal.cluster.WorkerConfig; import org.apache.gobblin.temporal.ddm.activity.ActivityType; @@ -174,25 +176,35 @@ protected Workload createWorkload(WUProcessingSpec workSpec, protected NestingExecWorkflow createProcessingWorkflow(FileSystemJobStateful f, Map searchAttributes) { - ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty()); + boolean dynamicScalingEnabled = config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED) + && config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED); + + ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder() .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) .setSearchAttributes(searchAttributes) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, - WorkerConfig.of(this).orElse(ConfigFactory.empty()))) - .build(); - // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? - return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); + .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, config)); + + // Route NestingExecWorkflow (work execution) to execution + if (dynamicScalingEnabled) { + childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE) + ? config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE) + : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE); + } + + return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts.build()); } protected CommitStepWorkflow createCommitStepWorkflow(Map searchAttributes) { + Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty()); ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() // TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE) .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) .setSearchAttributes(searchAttributes) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, - WorkerConfig.of(this).orElse(ConfigFactory.empty()))) + .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, config)) .build(); + // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow parent return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts); } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java new file mode 100644 index 00000000000..69772116d2a --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.cluster; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs; + + +/** + * Tests for {@link GobblinTemporalTaskRunner} worker initialization logic. + */ +public class GobblinTemporalTaskRunnerTest { + + /** + * Tests that initializeExecutionWorkers does nothing when dynamic scaling is disabled. + */ + @Test + public void testInitializeExecutionWorkersWhenDynamicScalingDisabled() throws Exception { + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, ConfigValueFactory.fromAnyRef(false)) + .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS, + ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS)); + + GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config); + List workers = getWorkersField(taskRunner); + int initialWorkerCount = workers.size(); + + invokeInitializeExecutionWorkers(taskRunner); + + Assert.assertEquals(workers.size(), initialWorkerCount, + "No workers should be added when dynamic scaling is disabled"); + } + + /** + * Tests that initializeExecutionWorkers does nothing when container is already ExecutionWorker. + */ + @Test + public void testInitializeExecutionWorkersWhenAlreadyExecutionWorker() throws Exception { + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS, + ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS)); + + GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config); + List workers = getWorkersField(taskRunner); + int initialWorkerCount = workers.size(); + + invokeInitializeExecutionWorkers(taskRunner); + + Assert.assertEquals(workers.size(), initialWorkerCount, + "No workers should be added when container is already ExecutionWorker"); + } + + /** + * Tests that initializeExecutionWorkers does nothing when dynamic scaling config is missing. + */ + @Test + public void testInitializeExecutionWorkersWhenConfigMissing() throws Exception { + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS, + ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS)); + + GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config); + List workers = getWorkersField(taskRunner); + int initialWorkerCount = workers.size(); + + invokeInitializeExecutionWorkers(taskRunner); + + Assert.assertEquals(workers.size(), initialWorkerCount, + "No workers should be added when dynamic scaling config is missing"); + } + + /** + * Helper to create a mock GobblinTemporalTaskRunner with necessary fields set. + */ + private GobblinTemporalTaskRunner createMockTaskRunner(Config config) throws Exception { + GobblinTemporalTaskRunner taskRunner = Mockito.mock(GobblinTemporalTaskRunner.class, Mockito.CALLS_REAL_METHODS); + + // Set clusterConfig field + Field clusterConfigField = GobblinTemporalTaskRunner.class.getDeclaredField("clusterConfig"); + clusterConfigField.setAccessible(true); + clusterConfigField.set(taskRunner, config); + + // Set workers list + Field workersField = GobblinTemporalTaskRunner.class.getDeclaredField("workers"); + workersField.setAccessible(true); + workersField.set(taskRunner, new ArrayList()); + + // Mock managedWorkflowServiceStubs + ManagedWorkflowServiceStubs mockStubs = Mockito.mock(ManagedWorkflowServiceStubs.class); + Field stubsField = GobblinTemporalTaskRunner.class.getDeclaredField("managedWorkflowServiceStubs"); + stubsField.setAccessible(true); + stubsField.set(taskRunner, mockStubs); + + // Mock WorkflowClient + Mockito.when(mockStubs.getWorkflowServiceStubs()).thenReturn(null); + + return taskRunner; + } + + /** + * Helper to invoke the private initializeExecutionWorkers method using reflection. + */ + private void invokeInitializeExecutionWorkers(GobblinTemporalTaskRunner taskRunner) throws Exception { + Method method = GobblinTemporalTaskRunner.class.getDeclaredMethod("initializeExecutionWorkers"); + method.setAccessible(true); + method.invoke(taskRunner); + } + + /** + * Helper to get the workers list field using reflection. + */ + @SuppressWarnings("unchecked") + private List getWorkersField(GobblinTemporalTaskRunner taskRunner) throws Exception { + Field workersField = GobblinTemporalTaskRunner.class.getDeclaredField("workers"); + workersField.setAccessible(true); + return (List) workersField.get(taskRunner); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java new file mode 100644 index 00000000000..0ff6edaf74e --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import java.util.List; +import java.util.Properties; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + + +/** + * Tests for {@link AbstractRecommendScalingForWorkUnitsImpl} focusing on profile overlay creation + * and ExecutionWorker configuration for dynamic scaling. + */ +public class AbstractRecommendScalingForWorkUnitsImplTest { + + private TestableAbstractRecommendScalingForWorkUnitsImpl scalingImpl; + private Properties jobProps; + + @Mock + private WorkUnitsSizeSummary mockWorkSummary; + + @Mock + private TimeBudget mockTimeBudget; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + scalingImpl = new TestableAbstractRecommendScalingForWorkUnitsImpl(); + jobProps = new Properties(); + // JobState requires job.name and job.id + jobProps.setProperty("job.name", "TestJob"); + jobProps.setProperty("job.id", "TestJob_123"); + } + + /** + * Tests that ExecutionWorker class is always added to profile overlay. + */ + @Test + public void testProfileOverlayAlwaysIncludesExecutionWorkerClass() { + JobState jobState = new JobState(jobProps); + + ProfileDerivation derivation = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState); + + Assert.assertNotNull(derivation); + Assert.assertEquals(derivation.getBasisProfileName(), WorkforceProfiles.BASELINE_NAME); + + ProfileOverlay overlay = derivation.getOverlay(); + Assert.assertTrue(overlay instanceof ProfileOverlay.Adding); + + // Verify ExecutionWorker class is in overlay + ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay; + boolean hasExecutionWorkerClass = addingOverlay.getAdditionPairs().stream() + .anyMatch(kv -> kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS) + && kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS)); + + Assert.assertTrue(hasExecutionWorkerClass, + "Profile overlay should always include ExecutionWorker class"); + } + + /** + * Tests that execution-specific memory is added to overlay when configured. + */ + @Test + public void testProfileOverlayIncludesExecutionMemoryWhenConfigured() { + String executionMemory = "32768"; + jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, executionMemory); + JobState jobState = new JobState(jobProps); + + ProfileDerivation derivation = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState); + + ProfileOverlay overlay = derivation.getOverlay(); + Assert.assertTrue(overlay instanceof ProfileOverlay.Adding); + + ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay; + + // Verify memory is in overlay + boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream() + .anyMatch(kv -> kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY) + && kv.getValue().equals(executionMemory)); + + Assert.assertTrue(hasMemoryConfig, + "Profile overlay should include execution memory when configured"); + } + + /** + * Tests that memory is not added to overlay when not configured (falls back to baseline). + */ + @Test + public void testProfileOverlayOmitsMemoryWhenNotConfigured() { + JobState jobState = new JobState(jobProps); + + ProfileDerivation derivation = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState); + + ProfileOverlay overlay = derivation.getOverlay(); + Assert.assertTrue(overlay instanceof ProfileOverlay.Adding); + + ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay; + + // Verify memory is NOT in overlay (will use baseline memory) + boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream() + .anyMatch(kv -> kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)); + + Assert.assertFalse(hasMemoryConfig, + "Profile overlay should not include memory config when not set (uses baseline)"); + } + + /** + * Tests that overlay contains both ExecutionWorker class and memory when both configured. + */ + @Test + public void testProfileOverlayIncludesBothWorkerClassAndMemory() { + String executionMemory = "65536"; + jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, executionMemory); + JobState jobState = new JobState(jobProps); + + ProfileDerivation derivation = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState); + + ProfileOverlay overlay = derivation.getOverlay(); + Assert.assertTrue(overlay instanceof ProfileOverlay.Adding); + + ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay; + List kvPairs = addingOverlay.getAdditionPairs(); + + // Should have exactly 2 entries: worker class + memory + Assert.assertEquals(kvPairs.size(), 2, + "Overlay should have 2 entries when memory is configured"); + + boolean hasWorkerClass = kvPairs.stream() + .anyMatch(kv -> kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS)); + boolean hasMemory = kvPairs.stream() + .anyMatch(kv -> kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)); + + Assert.assertTrue(hasWorkerClass && hasMemory, + "Overlay should contain both worker class and memory"); + } + + /** + * Tests that profile derivation name uses default. + */ + @Test + public void testProfileDerivationNameUsesDefault() { + JobState jobState = new JobState(jobProps); + + String derivationName = scalingImpl.calcProfileDerivationName(jobState); + + Assert.assertEquals(derivationName, AbstractRecommendScalingForWorkUnitsImpl.DEFAULT_PROFILE_DERIVATION_NAME); + } + + /** + * Tests that basis profile name is always baseline. + */ + @Test + public void testBasisProfileNameIsAlwaysBaseline() { + JobState jobState = new JobState(jobProps); + + String basisProfileName = scalingImpl.calcBasisProfileName(jobState); + + Assert.assertEquals(basisProfileName, WorkforceProfiles.BASELINE_NAME); + } + + /** + * Tests that recommendScaling returns a ScalingDirective with correct profile derivation. + */ + @Test + public void testRecommendScalingReturnsDirectiveWithProfileDerivation() { + String executionMemory = "16384"; + jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, executionMemory); + + List directives = scalingImpl.recommendScaling( + mockWorkSummary, "TestSource", mockTimeBudget, jobProps); + + Assert.assertNotNull(directives); + Assert.assertEquals(directives.size(), 1); + + ScalingDirective directive = directives.get(0); + Assert.assertTrue(directive.getOptDerivedFrom().isPresent(), + "Directive should have profile derivation"); + + ProfileDerivation derivation = directive.getOptDerivedFrom().get(); + Assert.assertEquals(derivation.getBasisProfileName(), WorkforceProfiles.BASELINE_NAME); + + // Verify overlay has ExecutionWorker class + ProfileOverlay overlay = derivation.getOverlay(); + Assert.assertTrue(overlay instanceof ProfileOverlay.Adding); + + ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay; + boolean hasExecutionWorker = addingOverlay.getAdditionPairs().stream() + .anyMatch(kv -> kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS) + && kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS)); + + Assert.assertTrue(hasExecutionWorker, + "Scaling directive should include ExecutionWorker in profile derivation"); + } + + /** + * Tests that different memory values create different overlays. + */ + @Test + public void testDifferentMemoryValuesCreateDifferentOverlays() { + // First config with 16GB + jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, "16384"); + JobState jobState1 = new JobState(jobProps); + ProfileDerivation derivation1 = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState1); + + // Second config with 32GB + Properties jobProps2 = new Properties(); + jobProps2.setProperty("job.name", "TestJob"); + jobProps2.setProperty("job.id", "TestJob_456"); + jobProps2.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, "32768"); + JobState jobState2 = new JobState(jobProps2); + ProfileDerivation derivation2 = scalingImpl.calcProfileDerivation( + WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState2); + + // Extract memory values from overlays + ProfileOverlay.Adding overlay1 = (ProfileOverlay.Adding) derivation1.getOverlay(); + ProfileOverlay.Adding overlay2 = (ProfileOverlay.Adding) derivation2.getOverlay(); + + String memory1 = overlay1.getAdditionPairs().stream() + .filter(kv -> kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)) + .map(kv -> kv.getValue()) + .findFirst() + .orElse(null); + + String memory2 = overlay2.getAdditionPairs().stream() + .filter(kv -> kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)) + .map(kv -> kv.getValue()) + .findFirst() + .orElse(null); + + Assert.assertEquals(memory1, "16384"); + Assert.assertEquals(memory2, "32768"); + Assert.assertNotEquals(memory1, memory2, + "Different memory configs should produce different overlay values"); + } + + /** + * Testable concrete implementation of AbstractRecommendScalingForWorkUnitsImpl. + */ + private static class TestableAbstractRecommendScalingForWorkUnitsImpl + extends AbstractRecommendScalingForWorkUnitsImpl { + + @Override + protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, + TimeBudget timeBudget, JobState jobState) { + // Simple test implementation: return 5 containers + return 5; + } + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java new file mode 100644 index 00000000000..c3b6234c422 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Tests for {@link ExecutionWorker} verifying workflow/activity registration and configuration. + */ +public class ExecutionWorkerTest { + + /** + * Tests that ExecutionWorker registers only the workflows needed for work execution. + */ + @Test + public void testGetWorkflowImplClasses() throws Exception { + Config config = ConfigFactory.empty(); + ExecutionWorker worker = createMockWorker(config); + + Class[] workflows = invokeGetWorkflowImplClasses(worker); + + Assert.assertEquals(workflows.length, 2, + "ExecutionWorker should register exactly 2 workflow types"); + + List workflowNames = Arrays.stream(workflows) + .map(Class::getName) + .collect(Collectors.toList()); + + Assert.assertTrue(workflowNames.contains(ProcessWorkUnitsWorkflowImpl.class.getName()), + "ExecutionWorker should register ProcessWorkUnitsWorkflowImpl"); + Assert.assertTrue(workflowNames.contains(NestingExecOfProcessWorkUnitWorkflowImpl.class.getName()), + "ExecutionWorker should register NestingExecOfProcessWorkUnitWorkflowImpl"); + } + + /** + * Tests that ExecutionWorker registers only ProcessWorkUnit activity. + */ + @Test + public void testGetActivityImplInstances() throws Exception { + Config config = ConfigFactory.empty(); + ExecutionWorker worker = createMockWorker(config); + + Object[] activities = invokeGetActivityImplInstances(worker); + + Assert.assertEquals(activities.length, 1, + "ExecutionWorker should register exactly 1 activity type"); + Assert.assertTrue(activities[0] instanceof ProcessWorkUnitImpl, + "ExecutionWorker should register ProcessWorkUnitImpl activity"); + } + + /** + * Tests that ExecutionWorker uses execution task queue from config. + */ + @Test + public void testGetTaskQueueFromConfig() throws Exception { + String customQueue = "CustomExecutionQueue"; + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, + ConfigValueFactory.fromAnyRef(customQueue)); + + ExecutionWorker worker = createMockWorker(config); + String taskQueue = invokeGetTaskQueue(worker); + + Assert.assertEquals(taskQueue, customQueue, + "ExecutionWorker should use execution task queue from config"); + } + + /** + * Tests that ExecutionWorker uses default execution task queue when not configured. + */ + @Test + public void testGetTaskQueueDefault() throws Exception { + Config config = ConfigFactory.empty(); + + ExecutionWorker worker = createMockWorker(config); + String taskQueue = invokeGetTaskQueue(worker); + + Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE, + "ExecutionWorker should use default execution task queue when not configured"); + } + + /** + * Tests that ExecutionWorker creates WorkerOptions with correct concurrency settings. + */ + @Test + public void testCreateWorkerOptionsWithCustomConcurrency() throws Exception { + int customConcurrency = 10; + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, + ConfigValueFactory.fromAnyRef(customConcurrency)); + + ExecutionWorker worker = createMockWorker(config); + WorkerOptions options = invokeCreateWorkerOptions(worker); + + Assert.assertEquals(options.getMaxConcurrentActivityExecutionSize(), customConcurrency, + "MaxConcurrentActivityExecutionSize should match configured value"); + Assert.assertEquals(options.getMaxConcurrentLocalActivityExecutionSize(), customConcurrency, + "MaxConcurrentLocalActivityExecutionSize should match configured value"); + Assert.assertEquals(options.getMaxConcurrentWorkflowTaskExecutionSize(), customConcurrency, + "MaxConcurrentWorkflowTaskExecutionSize should match configured value"); + } + + /** + * Tests that ExecutionWorker creates WorkerOptions with default concurrency. + */ + @Test + public void testCreateWorkerOptionsWithDefaultConcurrency() throws Exception { + Config config = ConfigFactory.empty(); + + ExecutionWorker worker = createMockWorker(config); + WorkerOptions options = invokeCreateWorkerOptions(worker); + + int defaultConcurrency = GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER; + Assert.assertEquals(options.getMaxConcurrentActivityExecutionSize(), defaultConcurrency, + "MaxConcurrentActivityExecutionSize should use default value"); + Assert.assertEquals(options.getMaxConcurrentLocalActivityExecutionSize(), defaultConcurrency, + "MaxConcurrentLocalActivityExecutionSize should use default value"); + Assert.assertEquals(options.getMaxConcurrentWorkflowTaskExecutionSize(), defaultConcurrency, + "MaxConcurrentWorkflowTaskExecutionSize should use default value"); + } + + /** + * Tests that ExecutionWorker sets deadlock detection timeout correctly. + */ + @Test + public void testCreateWorkerOptionsDeadlockTimeout() throws Exception { + Config config = ConfigFactory.empty(); + + ExecutionWorker worker = createMockWorker(config); + WorkerOptions options = invokeCreateWorkerOptions(worker); + + long expectedTimeoutMillis = TimeUnit.SECONDS.toMillis(ExecutionWorker.DEADLOCK_DETECTION_TIMEOUT_SECONDS); + Assert.assertEquals(options.getDefaultDeadlockDetectionTimeout(), expectedTimeoutMillis, + "Deadlock detection timeout should be set correctly"); + } + + /** + * Tests that maxExecutionConcurrency field is initialized from config. + */ + @Test + public void testMaxExecutionConcurrencyInitialization() throws Exception { + int customConcurrency = 15; + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, + ConfigValueFactory.fromAnyRef(customConcurrency)); + + ExecutionWorker worker = createMockWorker(config); + + Assert.assertEquals(worker.getMaxExecutionConcurrency(), customConcurrency, + "maxExecutionConcurrency should be initialized from config"); + } + + /** + * Tests that maxExecutionConcurrency uses default when not configured. + */ + @Test + public void testMaxExecutionConcurrencyDefault() throws Exception { + Config config = ConfigFactory.empty(); + + ExecutionWorker worker = createMockWorker(config); + + Assert.assertEquals(worker.getMaxExecutionConcurrency(), + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER, + "maxExecutionConcurrency should use default value when not configured"); + } + + /** + * Helper to create a mock ExecutionWorker without calling the constructor. + */ + private ExecutionWorker createMockWorker(Config config) throws Exception { + ExecutionWorker worker = Mockito.mock(ExecutionWorker.class, Mockito.CALLS_REAL_METHODS); + + // Set config field + Field configField = org.apache.gobblin.temporal.cluster.AbstractTemporalWorker.class.getDeclaredField("config"); + configField.setAccessible(true); + configField.set(worker, config); + + // Set maxExecutionConcurrency field + Field maxConcurrencyField = ExecutionWorker.class.getDeclaredField("maxExecutionConcurrency"); + maxConcurrencyField.setAccessible(true); + int concurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER); + maxConcurrencyField.set(worker, concurrency); + + return worker; + } + + /** + * Helper to invoke the protected getWorkflowImplClasses method using reflection. + */ + private Class[] invokeGetWorkflowImplClasses(ExecutionWorker worker) throws Exception { + Method method = ExecutionWorker.class.getDeclaredMethod("getWorkflowImplClasses"); + method.setAccessible(true); + return (Class[]) method.invoke(worker); + } + + /** + * Helper to invoke the protected getActivityImplInstances method using reflection. + */ + private Object[] invokeGetActivityImplInstances(ExecutionWorker worker) throws Exception { + Method method = ExecutionWorker.class.getDeclaredMethod("getActivityImplInstances"); + method.setAccessible(true); + return (Object[]) method.invoke(worker); + } + + /** + * Helper to invoke the protected getTaskQueue method using reflection. + */ + private String invokeGetTaskQueue(ExecutionWorker worker) throws Exception { + Method method = ExecutionWorker.class.getDeclaredMethod("getTaskQueue"); + method.setAccessible(true); + return (String) method.invoke(worker); + } + + /** + * Helper to invoke the protected createWorkerOptions method using reflection. + */ + private WorkerOptions invokeCreateWorkerOptions(ExecutionWorker worker) throws Exception { + Method method = ExecutionWorker.class.getDeclaredMethod("createWorkerOptions"); + method.setAccessible(true); + return (WorkerOptions) method.invoke(worker); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStageTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStageTest.java new file mode 100644 index 00000000000..1a56e051314 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStageTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + + +/** + * Tests for {@link WorkflowStage} to verify task queue configuration + * for different workflow stages. + */ +public class WorkflowStageTest { + + /** + * Tests that WORK_EXECUTION stage uses execution task queue from config. + */ + @Test + public void testWorkExecutionStageUsesExecutionQueue() { + // Setup + String customExecutionQueue = "CustomExecutionQueue"; + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, + ConfigValueFactory.fromAnyRef(customExecutionQueue)); + + // Execute + String taskQueue = WorkflowStage.WORK_EXECUTION.getTaskQueue(config); + + // Verify + Assert.assertEquals(taskQueue, customExecutionQueue, + "WORK_EXECUTION should use configured execution queue"); + } + + /** + * Tests that WORK_EXECUTION stage falls back to default execution queue. + */ + @Test + public void testWorkExecutionStageUsesDefaultQueue() { + // Setup - empty config + Config config = ConfigFactory.empty(); + + // Execute + String taskQueue = WorkflowStage.WORK_EXECUTION.getTaskQueue(config); + + // Verify + Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE, + "WORK_EXECUTION should use default execution queue when not configured"); + } + + /** + * Tests that WORK_DISCOVERY stage uses default task queue. + */ + @Test + public void testWorkDiscoveryStageUsesDefaultQueue() { + // Setup + Config config = ConfigFactory.empty(); + + // Execute + String taskQueue = WorkflowStage.WORK_DISCOVERY.getTaskQueue(config); + + // Verify + Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE, + "WORK_DISCOVERY should use default task queue"); + } + + /** + * Tests that WORK_COMMIT stage uses default task queue. + */ + @Test + public void testWorkCommitStageUsesDefaultQueue() { + // Setup + Config config = ConfigFactory.empty(); + + // Execute + String taskQueue = WorkflowStage.COMMIT.getTaskQueue(config); + + // Verify + Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE, + "COMMIT should use default task queue"); + } + + /** + * Tests that different stages use different task queues in dynamic scaling mode. + */ + @Test + public void testDifferentStagesUseDifferentQueues() { + // Setup + Config config = ConfigFactory.empty() + .withValue(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE, + ConfigValueFactory.fromAnyRef("ExecutionQueue")) + .withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE, + ConfigValueFactory.fromAnyRef("DefaultQueue")); + + // Execute + String executionQueue = WorkflowStage.WORK_EXECUTION.getTaskQueue(config); + String discoveryQueue = WorkflowStage.WORK_DISCOVERY.getTaskQueue(config); + String commitQueue = WorkflowStage.COMMIT.getTaskQueue(config); + + // Verify + Assert.assertEquals(executionQueue, "ExecutionQueue"); + Assert.assertEquals(discoveryQueue, "DefaultQueue"); + Assert.assertEquals(commitQueue, "DefaultQueue"); + Assert.assertNotEquals(executionQueue, discoveryQueue, + "Execution and discovery should use different queues"); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java new file mode 100644 index 00000000000..272aca43fdf --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.cluster.WorkerConfig; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; +import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; + + +/** + * Tests for {@link ProcessWorkUnitsWorkflowImpl} focusing on task queue routing + * and child workflow creation. + */ +public class ProcessWorkUnitsWorkflowImplTest { + + private MockedStatic workflowMockedStatic; + private MockedStatic workerConfigMockedStatic; + private MockedStatic helpMockedStatic; + private ProcessWorkUnitsWorkflowImpl workflow; + + @BeforeMethod + public void setup() { + workflowMockedStatic = Mockito.mockStatic(Workflow.class); + workerConfigMockedStatic = Mockito.mockStatic(WorkerConfig.class); + helpMockedStatic = Mockito.mockStatic(Help.class); + + // Mock Help.qualifyNamePerExecWithFlowExecId to return a simple workflow ID + helpMockedStatic.when(() -> Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any(), Mockito.any())) + .thenReturn("test-workflow-id"); + helpMockedStatic.when(() -> Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any())) + .thenReturn("test-workflow-id"); + + workflow = new ProcessWorkUnitsWorkflowImpl(); + } + + @AfterMethod + public void tearDown() { + if (workflowMockedStatic != null) { + workflowMockedStatic.close(); + } + if (workerConfigMockedStatic != null) { + workerConfigMockedStatic.close(); + } + if (helpMockedStatic != null) { + helpMockedStatic.close(); + } + } + + /** + * Tests that CommitStepWorkflow child workflow is created without explicit task queue, + * allowing it to inherit the parent workflow's task queue (default queue). + * This ensures CommitStepWorkflow runs on WorkFulfillmentWorker, not ExecutionWorker. + */ + @Test + public void testCreateCommitStepWorkflowUsesDefaultQueue() { + // Setup + Map searchAttributes = new HashMap<>(); + searchAttributes.put("test", "value"); + + Config mockConfig = ConfigFactory.empty(); + workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any())) + .thenReturn(java.util.Optional.of(mockConfig)); + + CommitStepWorkflow mockCommitWorkflow = Mockito.mock(CommitStepWorkflow.class); + + // Capture the ChildWorkflowOptions passed to newChildWorkflowStub + workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub( + Mockito.eq(CommitStepWorkflow.class), + Mockito.any(ChildWorkflowOptions.class))) + .thenAnswer(invocation -> { + ChildWorkflowOptions options = invocation.getArgument(1); + // Verify task queue is NOT set (should be null to inherit from parent) + Assert.assertNull(options.getTaskQueue(), + "CommitStepWorkflow should not have explicit task queue set"); + Assert.assertEquals(options.getSearchAttributes(), searchAttributes); + return mockCommitWorkflow; + }); + + // Execute + CommitStepWorkflow result = workflow.createCommitStepWorkflow(searchAttributes); + + // Verify + Assert.assertNotNull(result); + workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub( + Mockito.eq(CommitStepWorkflow.class), + Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1)); + } + + /** + * Tests that CommitStepWorkflow creation works when WorkerConfig is absent. + */ + @Test + public void testCreateCommitStepWorkflowWithoutWorkerConfig() { + // Setup + Map searchAttributes = new HashMap<>(); + + workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any())) + .thenReturn(java.util.Optional.empty()); + + CommitStepWorkflow mockCommitWorkflow = Mockito.mock(CommitStepWorkflow.class); + + workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub( + Mockito.eq(CommitStepWorkflow.class), + Mockito.any(ChildWorkflowOptions.class))) + .thenReturn(mockCommitWorkflow); + + // Execute + CommitStepWorkflow result = workflow.createCommitStepWorkflow(searchAttributes); + + // Verify + Assert.assertNotNull(result); + } + + /** + * Tests that NestingExecWorkflow is routed to execution queue when dynamic scaling is enabled. + */ + @Test + public void testCreateProcessingWorkflowWithDynamicScalingEnabled() { + // Setup + Map searchAttributes = new HashMap<>(); + WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class); + + Config mockConfig = ConfigFactory.parseString( + GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=true\n" + + GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE + "=TestExecutionQueue" + ); + + workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any())) + .thenReturn(java.util.Optional.of(mockConfig)); + + NestingExecWorkflow mockNestingWorkflow = Mockito.mock(NestingExecWorkflow.class); + + // Capture the ChildWorkflowOptions passed to newChildWorkflowStub + workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub( + Mockito.eq(NestingExecWorkflow.class), + Mockito.any(ChildWorkflowOptions.class))) + .thenAnswer(invocation -> { + ChildWorkflowOptions options = invocation.getArgument(1); + // Verify task queue IS set to execution queue + Assert.assertEquals(options.getTaskQueue(), "TestExecutionQueue", + "NestingExecWorkflow should use execution queue when dynamic scaling is enabled"); + return mockNestingWorkflow; + }); + + // Execute + NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec, searchAttributes); + + // Verify + Assert.assertNotNull(result); + workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub( + Mockito.eq(NestingExecWorkflow.class), + Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1)); + } + + /** + * Tests that NestingExecWorkflow does NOT have task queue set when dynamic scaling is disabled. + */ + @Test + public void testCreateProcessingWorkflowWithDynamicScalingDisabled() { + // Setup + Map searchAttributes = new HashMap<>(); + WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class); + + Config mockConfig = ConfigFactory.parseString( + GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=false" + ); + + workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any())) + .thenReturn(java.util.Optional.of(mockConfig)); + + NestingExecWorkflow mockNestingWorkflow = Mockito.mock(NestingExecWorkflow.class); + + workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub( + Mockito.eq(NestingExecWorkflow.class), + Mockito.any(ChildWorkflowOptions.class))) + .thenAnswer(invocation -> { + ChildWorkflowOptions options = invocation.getArgument(1); + // Verify task queue is NOT set (inherits from parent) + Assert.assertNull(options.getTaskQueue(), + "NestingExecWorkflow should not have explicit task queue when dynamic scaling is disabled"); + return mockNestingWorkflow; + }); + + // Execute + NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec, searchAttributes); + + // Verify + Assert.assertNotNull(result); + } + + /** + * Tests that NestingExecWorkflow does NOT have task queue set when dynamic scaling config is absent. + */ + @Test + public void testCreateProcessingWorkflowWithoutDynamicScalingConfig() { + // Setup + Map searchAttributes = new HashMap<>(); + WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class); + + Config mockConfig = ConfigFactory.empty(); + + workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any())) + .thenReturn(java.util.Optional.of(mockConfig)); + + NestingExecWorkflow mockNestingWorkflow = Mockito.mock(NestingExecWorkflow.class); + + workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub( + Mockito.eq(NestingExecWorkflow.class), + Mockito.any(ChildWorkflowOptions.class))) + .thenAnswer(invocation -> { + ChildWorkflowOptions options = invocation.getArgument(1); + // Verify task queue is NOT set (inherits from parent) + Assert.assertNull(options.getTaskQueue(), + "NestingExecWorkflow should not have explicit task queue when dynamic scaling config is absent"); + return mockNestingWorkflow; + }); + + // Execute + workflow.createProcessingWorkflow(mockSpec, searchAttributes); + } +}