diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index 735df11c..6cfd96d2 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; +import org.onedatashare.transferservice.odstransferservice.model.StopJobRequest; import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest; import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; @@ -83,11 +84,21 @@ public void consumeDefaultMessage(final Message message) { logger.debug("Failed to parse jsonStr:{} to TransferJobRequest.java", jsonStr); } try { + Object obj = objectMapper.readValue(jsonStr, Object.class); + if (obj instanceof StopJobRequest) { + StopJobRequest stopJobRequest = (StopJobRequest) obj; + Long jobId = stopJobRequest.getJobId(); + String transferNodeName = stopJobRequest.getTransferNodeName(); + jc.jobStop(jobId, transferNodeName); + } + else{ TransferApplicationParams params = objectMapper.readValue(jsonStr, TransferApplicationParams.class); logger.info("Parsed TransferApplicationParams:{}", params); this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism()); + } } catch (Exception e) { logger.info("Did not apply transfer params due to parsing message failure"); } } + } \ No newline at end of file diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java new file mode 100644 index 00000000..42dc3edf --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java @@ -0,0 +1,14 @@ +package org.onedatashare.transferservice.odstransferservice.model; + +public class StopJobRequest { + Long jobId; + String transferNodeName; + + public Long getJobId() { + return jobId; + } + + public String getTransferNodeName() { + return transferNodeName; + } +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java index 13d6092c..b58a6f06 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java @@ -34,13 +34,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.launch.JobExecutionNotRunningException; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; @@ -104,6 +109,11 @@ public class JobControl extends DefaultBatchConfigurer { @Autowired JobRepository roachRepository; + @Autowired + private JobExplorer jobExplorer; + + @Autowired + private JobOperator jobOperator; @Lazy @@ -257,4 +267,23 @@ private void setRetryPolicy() { SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryAttempts, retryFor); this.retryTemplateForReaderAndWriter.setRetryPolicy(retryPolicy); } + public void jobStop(long jobId, String transferNodeName) { + try{ + for(JobExecution jobExecution: jobExplorer.getJobExecutions(jobExplorer.getJobInstance(jobId))){ + if (jobExecution.getExecutionContext().getString("routingKey").equals(transferNodeName)){ + jobOperator.stop(jobExecution.getId()); + logger.info("Job Stopped Successfully"); + + } + } + logger.info("No matching Job instances found"); + + } + catch (NoSuchJobExecutionException | JobExecutionNotRunningException e){ + logger.info("Error stopping job"); + + } + + + } } \ No newline at end of file