diff --git a/Dockerfile b/Dockerfile index 4c3032c..3f44983 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,9 @@ FROM openjdk:17-slim COPY target/controller-service-0.0.1-SNAPSHOT.jar controller.jar ENV KAFKA_POWERPLANT_TOPIC=backend +ENV KAFKA_POWERPLANT_CONTROL_TOPIC=powerplant ENV KAFKA_GROUP_ID=controller +EXPOSE 8080 + ENTRYPOINT ["java", "-jar", "/controller.jar"] \ No newline at end of file diff --git a/README.md b/README.md index 6130833..21d1b77 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The application requires the following environment variables: | KAFKA_BROKER | IP address of Kafka Broker | | KAFKA_BROKER_PORT | Port number of Kafka Broker | |KAFKA_GROUP_ID| Group id of controller service (default in Docker container: controller) | +|KAFKA_POWERPLANT_CONTROL_TOPIC| Topic for powerplant commands (default in Docker container: powerplant | |KAFKA_POWERPLANT_TOPIC| Topic of powerplant status updates (default in Docker container: backend) | ## Run using Docker @@ -27,6 +28,7 @@ Step 2: Execute the Docker container with required ENV variables: ```shell docker run \ + -p 8080:8080\ -e KAFKA_BROKER= \ -e KAFKA_BROKER_PORT= \ kwkmdo-controller diff --git a/pom.xml b/pom.xml index 8996835..9648182 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,10 @@ mockito-core 5.12.0 + + org.springframework.boot + spring-boot-starter-web + diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java new file mode 100644 index 0000000..358ad63 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java @@ -0,0 +1,31 @@ +package kilowattcommando.controllerservice.kafka; + +import dto.PowerplantCommand; +import dto.PowerplantOperation; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class CommandSender implements PowerPlantCommandSender { + private static final Logger log = LoggerFactory.getLogger(PowerPlantCommandSender.class); + + @Value("${kafka.controlTopic}") + private String powerPlantControlTopic; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Override + public void sendCommand(String powerPlantId, PowerplantCommand command) { + PowerplantOperation powerplantOperation = new PowerplantOperation(command); + ProducerRecord commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation); + commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes()); + kafkaTemplate.send(commandRecord); + log.info("Sent command {} to powerplant {}", command, powerPlantId); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..71e43a4 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java @@ -0,0 +1,45 @@ +package kilowattcommando.controllerservice.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaProducerConfig { + @Value("${kafka.serverAddress}") + private String kafkaServerAddress; + + @Value("${kafka.serverPort}") + private String kafkaServerPort; + + @Bean + public ProducerFactory producerFactory() { + Map props = new HashMap<>(); + props.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + String.format("%s:%s", kafkaServerAddress, kafkaServerPort)); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java new file mode 100644 index 0000000..8b301cd --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java @@ -0,0 +1,9 @@ +package kilowattcommando.controllerservice.kafka; + +import dto.PowerplantCommand; +import org.springframework.stereotype.Service; + +@Service +public interface PowerPlantCommandSender { + void sendCommand(String powerPlantId, PowerplantCommand command); +} diff --git a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java new file mode 100644 index 0000000..fd86245 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java @@ -0,0 +1,26 @@ +package kilowattcommando.controllerservice.rest; + +import dto.PowerplantCommand; + +public record GateClosure(String closure) { + public boolean validate() { + return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS"); + } + + public PowerplantCommand getEquivalentPowerplantCommand() { + switch (closure) { + case "OPEN": + return PowerplantCommand.gateOpen; + case "CLOSED": + return PowerplantCommand.gateClose; + case "HALF": + return PowerplantCommand.gateHalf; + case "QUARTER": + return PowerplantCommand.gateQuarter; + case "THREE_QUARTERS": + return PowerplantCommand.gateThreeQuarters; + default: + return null; + } + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java new file mode 100644 index 0000000..8fd95c3 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java @@ -0,0 +1,46 @@ +package kilowattcommando.controllerservice.rest; + +import dto.PowerplantCommand; +import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/powerplant/{id}") +public class PowerplantRestController { + private static final Logger log = LoggerFactory.getLogger(PowerplantRestController.class); + + private final PowerPlantCommandSender powerPlantCommandSender; + + public PowerplantRestController(PowerPlantCommandSender powerPlantCommandSender) { + this.powerPlantCommandSender = powerPlantCommandSender; + } + + @PutMapping("/start") + private ResponseEntity startPowerplant(@PathVariable String id) { + log.info("Request to start powerplant with id {}", id); + powerPlantCommandSender.sendCommand(id, PowerplantCommand.start); + return ResponseEntity.ok("Powerplant started"); + } + + @PutMapping("/stop") + private ResponseEntity stopPowerplant(@PathVariable String id) { + log.info("Request to stop powerplant with id {}", id); + powerPlantCommandSender.sendCommand(id, PowerplantCommand.stop); + return ResponseEntity.ok("Powerplant stopped"); + } + + @PutMapping("/gateClosure") + private ResponseEntity changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) { + log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id); + if(gateClosure.validate()) { + powerPlantCommandSender.sendCommand(id, gateClosure.getEquivalentPowerplantCommand()); + return ResponseEntity.ok("Changed gate closure"); + } else { + return ResponseEntity.badRequest().body("Invalid gate closure"); + } + + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 67b04e3..34f0be6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,4 +2,5 @@ spring.application.name=controller-service kafka.serverAddress=${KAFKA_BROKER} kafka.serverPort=${KAFKA_BROKER_PORT} kafka.groupId=${KAFKA_GROUP_ID} -kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC} \ No newline at end of file +kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC} +kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC} \ No newline at end of file diff --git a/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java new file mode 100644 index 0000000..fe58bed --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java @@ -0,0 +1,112 @@ +package kilowattcommando.controllerservice; + +import dto.DTOJsonDeserializer; +import dto.PowerplantCommand; +import dto.PowerplantOperation; +import kilowattcommando.controllerservice.kafka.KafkaContainerExtension; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@ExtendWith(KafkaContainerExtension.class) +@DirtiesContext +@AutoConfigureMockMvc +public class CommandIntegrationTest { + + + @Autowired + private MockMvc mockMvc; + + @Value("kafka.groupId") + private String groupId; + + private KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + @Test + void startPowerPlant_1() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(1, records.count()); + ConsumerRecord record = records.iterator().next(); + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + } + ); + + } + } + + @Test + void startPowerPlant_1_and_setGateToHalf() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\": \"HALF\"}")).andExpect(status().isOk()); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(2, records.count()); + + Iterator> iterator = records.iterator(); + ConsumerRecord record = iterator.next(); + ConsumerRecord record2 = iterator.next(); + + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + + assertEquals("1", new String(record2.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.gateHalf, record2.value().command); + } + ); + + } + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java new file mode 100644 index 0000000..fae173c --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java @@ -0,0 +1,74 @@ +package kilowattcommando.controllerservice.kafka; + +import dto.DTOJsonDeserializer; +import dto.PowerplantCommand; +import dto.PowerplantOperation; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) +public class CommandSenderTest { + + @Value("kafka.groupId") + private String groupId; + + @Autowired + private PowerPlantCommandSender powerPlantCommandSender; + + private KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + @Test + void startPowerPlant_1() throws Exception { + powerPlantCommandSender.sendCommand("1", PowerplantCommand.start); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(1, records.count()); + ConsumerRecord record = records.iterator().next(); + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + } + ); + + } + } + +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java new file mode 100644 index 0000000..8ab613f --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java @@ -0,0 +1,29 @@ +package kilowattcommando.controllerservice.kafka; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +public class KafkaContainerExtension implements BeforeAllCallback, AfterAllCallback { + protected static KafkaContainer KAFKA; + + @Override + public void beforeAll(ExtensionContext extensionContext) throws Exception { + KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")); + + KAFKA.start(); + + System.setProperty("kafka.serverAddress", KAFKA.getIpAddress()); + System.setProperty("kafka.serverPort", String.valueOf(KAFKA.getFirstMappedPort())); + System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers()); + System.setProperty("kafka.powerplant.topic", "backend"); + System.setProperty("kafka.controlTopic", "powerplant"); + } + + @Override + public void afterAll(ExtensionContext extensionContext) throws Exception { + // This is done by testcontainers + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index 805c3dd..aff20cb 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -1,45 +1,39 @@ package kilowattcommando.controllerservice.kafka; import dto.PowerplantStatus; -import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.internal.util.MockUtil; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.fail; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @SpringBootTest @@ -47,38 +41,40 @@ properties = { "spring.application.name=controller-service" }) -@Testcontainers +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) public class PowerPlantListenerTest { - @Container - static final KafkaContainer KAFKA = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka") - ); - @Autowired - private KafkaTemplate kafkaTemplate; - + private static final Logger log = LoggerFactory.getLogger(PowerPlantListenerTest.class); @Autowired private PowerPlantStatusHandler powerPlantStatusHandler; - @DynamicPropertySource - static void kafkaProperties(DynamicPropertyRegistry registry) { - registry.add("kafka.serverAddress", KAFKA::getIpAddress); - registry.add("kafka.serverPort", KAFKA::getFirstMappedPort); - registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + private KafkaProducer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + return new KafkaProducer<>(props); } @Test - void shouldHandlePowerplantStatusEvent() { + void shouldHandlePowerplantStatusEvent() throws InterruptedException { PowerplantStatus powerplantStatus = new PowerplantStatus(); powerplantStatus.name = "powerplant1"; - kafkaTemplate.send("powerplant", powerplantStatus); + KafkaProducer producer = createProducer(); + + + ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus); + + Thread.sleep(3000); // Wait for topic metadata to catch up + producer.send(record); await() .pollInterval(3, TimeUnit.SECONDS) - .atMost(10, TimeUnit.SECONDS) + .atMost(60, TimeUnit.SECONDS) .untilAsserted(() -> { - assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount()); + Assertions.assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount()); }); } @@ -87,27 +83,13 @@ void shouldHandlePowerplantStatusEvent() { * Configuration of the KafkaProducer for testing purposes */ @TestConfiguration - public static class KafkaProducerConfig { + public static class KafkaProducerTestConfig { @Value("${kafka.serverPort}") private String kafkaServerPort; @Value("${kafka.serverAddress}") private String kafkaServerAddress; - @Bean - public ProducerFactory producerFactory() { - Map configs = new HashMap<>(); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress + ":" + kafkaServerPort); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - @Bean public PowerPlantStatusHandler powerPlantStatusHandler() { return new PowerPlantHandlerStub(); diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java new file mode 100644 index 0000000..3bd2f0d --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java @@ -0,0 +1,95 @@ +package kilowattcommando.controllerservice.rest; + +import dto.PowerplantCommand; +import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MockMvc; + +import static org.mockito.Mockito.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@WebMvcTest(PowerplantRestController.class) +public class PowerplantRestAPITest { + @Autowired + private MockMvc mockMvc; + + @Autowired + private PowerPlantCommandSender powerPlantCommandSender; + + @TestConfiguration + static class PowerplantRestTestConfiguration { + @Bean + public PowerPlantCommandSender powerPlantCommandSender() { + return mock(PowerPlantCommandSender.class); + } + } + + @BeforeEach + public void reset() { + Mockito.reset(powerPlantCommandSender); + } + + @Test + public void startPowerplantWithId_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.start); + } + + @Test + public void stopPowerplantWithId_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/stop")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.stop); + } + + @Test + public void closeGateOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateClose); + } + + @Test + public void openGateOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"OPEN\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateOpen); + } + + @Test + public void setGateToQuarterOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"QUARTER\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateQuarter); + } + + @Test + public void setGateToHalfOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"HALF\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateHalf); + } + + @Test + public void setGateToThreeQuartersOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THREE_QUARTERS\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateThreeQuarters); + } + + @Test + public void invalidGateClosureOnPlant_1_400() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400)); + + verify(powerPlantCommandSender, times(0)).sendCommand(anyString(), any()); + } +}