From 012a40520c25d61e5fd3dcf449b7211bbd95883e Mon Sep 17 00:00:00 2001 From: Amisha Sinha Date: Thu, 6 Apr 2023 15:19:41 -0400 Subject: [PATCH 1/2] Poison pill mechanism implementation --- .../config/RabbitMQConfig.java | 3 ++ .../consumer/RabbitMQConsumer.java | 19 ++++++++++++ .../model/StopJobRequest.java | 14 +++++++++ .../service/JobControl.java | 29 +++++++++++++++++++ 4 files changed, 65 insertions(+) create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index be81ad00..4fa2419b 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -45,4 +45,7 @@ public Binding binding(DirectExchange exchange, Queue userQueue){ .to(exchange) .with(routingKey); } + public Queue stopJobQueue(){ + return new Queue(this.queueName,false, false, false); + } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index 735df11c..ddefbc5b 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; +import org.onedatashare.transferservice.odstransferservice.model.StopJobRequest; import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest; import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; @@ -90,4 +91,22 @@ public void consumeDefaultMessage(final Message message) { logger.info("Did not apply transfer params due to parsing message failure"); } } + @RabbitListener(queues = "#{stopJobQueue}") + public void consumeStopJobRequest(final Message message){ + String jsonStr = new String(message.getBody()); + logger.info("Stop job request received: {}", jsonStr); + try{ + StopJobRequest stopJobRequest = objectMapper.readValue(jsonStr, StopJobRequest.class); + Long jobId = stopJobRequest.getJobId(); + String transferNodeName = stopJobRequest.getTransferNodeName(); + jc.jobStop(jobId, transferNodeName); + + } + catch (JsonProcessingException e){ + logger.error("Error parsing stop job request JSON: {}", e.getMessage()); + } + + + } + } \ No newline at end of file diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java new file mode 100644 index 00000000..42dc3edf --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/StopJobRequest.java @@ -0,0 +1,14 @@ +package org.onedatashare.transferservice.odstransferservice.model; + +public class StopJobRequest { + Long jobId; + String transferNodeName; + + public Long getJobId() { + return jobId; + } + + public String getTransferNodeName() { + return transferNodeName; + } +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java index 13d6092c..b58a6f06 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java @@ -34,13 +34,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.launch.JobExecutionNotRunningException; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; @@ -104,6 +109,11 @@ public class JobControl extends DefaultBatchConfigurer { @Autowired JobRepository roachRepository; + @Autowired + private JobExplorer jobExplorer; + + @Autowired + private JobOperator jobOperator; @Lazy @@ -257,4 +267,23 @@ private void setRetryPolicy() { SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryAttempts, retryFor); this.retryTemplateForReaderAndWriter.setRetryPolicy(retryPolicy); } + public void jobStop(long jobId, String transferNodeName) { + try{ + for(JobExecution jobExecution: jobExplorer.getJobExecutions(jobExplorer.getJobInstance(jobId))){ + if (jobExecution.getExecutionContext().getString("routingKey").equals(transferNodeName)){ + jobOperator.stop(jobExecution.getId()); + logger.info("Job Stopped Successfully"); + + } + } + logger.info("No matching Job instances found"); + + } + catch (NoSuchJobExecutionException | JobExecutionNotRunningException e){ + logger.info("Error stopping job"); + + } + + + } } \ No newline at end of file From 0a6b4eb5b1e333ca846a45bbac953a4bfc3b1596 Mon Sep 17 00:00:00 2001 From: Amisha Sinha Date: Tue, 18 Apr 2023 11:39:41 -0400 Subject: [PATCH 2/2] Poison pill mechanism-implementing stopjob in already defined queue --- .../config/RabbitMQConfig.java | 3 --- .../consumer/RabbitMQConsumer.java | 26 +++++++------------ 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index 4fa2419b..be81ad00 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -45,7 +45,4 @@ public Binding binding(DirectExchange exchange, Queue userQueue){ .to(exchange) .with(routingKey); } - public Queue stopJobQueue(){ - return new Queue(this.queueName,false, false, false); - } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index ddefbc5b..6cfd96d2 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -84,29 +84,21 @@ public void consumeDefaultMessage(final Message message) { logger.debug("Failed to parse jsonStr:{} to TransferJobRequest.java", jsonStr); } try { + Object obj = objectMapper.readValue(jsonStr, Object.class); + if (obj instanceof StopJobRequest) { + StopJobRequest stopJobRequest = (StopJobRequest) obj; + Long jobId = stopJobRequest.getJobId(); + String transferNodeName = stopJobRequest.getTransferNodeName(); + jc.jobStop(jobId, transferNodeName); + } + else{ TransferApplicationParams params = objectMapper.readValue(jsonStr, TransferApplicationParams.class); logger.info("Parsed TransferApplicationParams:{}", params); this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism()); + } } catch (Exception e) { logger.info("Did not apply transfer params due to parsing message failure"); } } - @RabbitListener(queues = "#{stopJobQueue}") - public void consumeStopJobRequest(final Message message){ - String jsonStr = new String(message.getBody()); - logger.info("Stop job request received: {}", jsonStr); - try{ - StopJobRequest stopJobRequest = objectMapper.readValue(jsonStr, StopJobRequest.class); - Long jobId = stopJobRequest.getJobId(); - String transferNodeName = stopJobRequest.getTransferNodeName(); - jc.jobStop(jobId, transferNodeName); - - } - catch (JsonProcessingException e){ - logger.error("Error parsing stop job request JSON: {}", e.getMessage()); - } - - - } } \ No newline at end of file