From c22c7fdb3ca3d56243bef09b38deec7617958771 Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 13:49:46 +0100 Subject: [PATCH 01/11] feat(rest-api): Add REST endpoints for Powerplant commands Created REST Controller with mapping /powerplant/{id} and the following endpoints: + /start PUT starts the powerplant with id id + /stop PUT stops the powerplant with id id + /gateClosure PUT sets the gate's closure of the powerplant with id id to either OPEN, CLOSED, HALF, QUARTER, THREE_QUARTERS --- pom.xml | 4 ++ .../controllerservice/rest/GateClosure.java | 7 ++++ .../rest/PowerplantRestController.java | 37 ++++++++++++++++ .../rest/PowerplantRestAPITest.java | 42 +++++++++++++++++++ 4 files changed, 90 insertions(+) create mode 100644 src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java create mode 100644 src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java create mode 100644 src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java diff --git a/pom.xml b/pom.xml index 8996835..9648182 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,10 @@ mockito-core 5.12.0 + + org.springframework.boot + spring-boot-starter-web + diff --git a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java new file mode 100644 index 0000000..156f737 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java @@ -0,0 +1,7 @@ +package kilowattcommando.controllerservice.rest; + +public record GateClosure(String closure) { + public boolean validate() { + return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS"); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java new file mode 100644 index 0000000..35e765a --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java @@ -0,0 +1,37 @@ +package kilowattcommando.controllerservice.rest; + +import jakarta.websocket.server.PathParam; +import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/powerplant/{id}") +public class PowerplantRestController { + private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class); + + @PutMapping("/start") + private ResponseEntity startPowerplant(@PathVariable Long id) { + log.info("Request to start powerplant with id {}", id); + return ResponseEntity.ok("Powerplant started"); + } + + @PutMapping("/stop") + private ResponseEntity stopPowerplant(@PathVariable Long id) { + log.info("Request to stop powerplant with id {}", id); + return ResponseEntity.ok("Powerplant stopped"); + } + + @PutMapping("/gateClosure") + private ResponseEntity changeGateClosure(@PathVariable Long id, @RequestBody GateClosure gateClosure) { + log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id); + if(gateClosure.validate()) { + return ResponseEntity.ok("Changed gate closure"); + } else { + return ResponseEntity.badRequest().body("Invalid gate closure"); + } + + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java new file mode 100644 index 0000000..184a457 --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java @@ -0,0 +1,42 @@ +package kilowattcommando.controllerservice.rest; + +import kilowattcommando.controllerservice.ControllerServiceApplication; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@WebMvcTest(PowerplantRestController.class) +public class PowerplantRestAPITest { + @Autowired + private MockMvc mockMvc; + + @Test + public void startPowerplantWithId_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + } + + @Test + public void stopPowerplantWithId_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/stop")).andExpect(status().isOk()); + } + + @Test + public void closeGateOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk()); + } + + @Test + public void invalidGateClosureOnPlant_1_400() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400)); + } +} From 6459da23a39d51adbc630961a2389f78ae5c1c9f Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 14:04:06 +0100 Subject: [PATCH 02/11] test(rest-api): Add tests for all closure values --- .../rest/PowerplantRestAPITest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java index 184a457..9dee8bf 100644 --- a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java +++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java @@ -35,6 +35,26 @@ public void closeGateOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk()); } + @Test + public void openGateOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"OPEN\"}")).andExpect(status().isOk()); + } + + @Test + public void setGateToQuarterOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"QUARTER\"}")).andExpect(status().isOk()); + } + + @Test + public void setGateToHalfOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"HALF\"}")).andExpect(status().isOk()); + } + + @Test + public void setGateToThreeQuartersOnPlant_1_OK() throws Exception { + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THREE_QUARTERS\"}")).andExpect(status().isOk()); + } + @Test public void invalidGateClosureOnPlant_1_400() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400)); From 736e4aa1f7d5081507d80c29fa008d980fc63523 Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 17:13:15 +0100 Subject: [PATCH 03/11] feat(kafka): Create producer beans --- .../kafka/CommandSender.java | 16 +++++++ .../kafka/KafkaProducerConfig.java | 46 +++++++++++++++++++ .../kafka/PowerPlantCommandSender.java | 5 ++ 3 files changed, 67 insertions(+) create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java new file mode 100644 index 0000000..3d4fe0c --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java @@ -0,0 +1,16 @@ +package kilowattcommando.controllerservice.kafka; + +import dto.PowerplantCommand; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class CommandSender implements PowerPlantCommandSender { + @Autowired + private KafkaTemplate kafkaTemplate; + + @Override + public void sendCommand(Long powerPlantId, String command) { + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..ccae5d7 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java @@ -0,0 +1,46 @@ +package kilowattcommando.controllerservice.kafka; + +import com.fasterxml.jackson.databind.JsonSerializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaProducerConfig { + @Value("${kafka.serverAddress}") + private String kafkaServerAddress; + + @Value("${kafka.serverPort}") + private String kafkaServerPort; + + @Bean + public ProducerFactory producerFactory() { + Map props = new HashMap<>(); + props.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + String.format("%s:%s", kafkaServerAddress, kafkaServerPort)); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java new file mode 100644 index 0000000..1e9c14f --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java @@ -0,0 +1,5 @@ +package kilowattcommando.controllerservice.kafka; + +public interface PowerPlantCommandSender { + void sendCommand(Long powerPlantId, String command); +} From 1fcb910a57636f6cbfd647c373c537ff89f38e7e Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 20:25:02 +0100 Subject: [PATCH 04/11] fix(test): Change test configuration for Kafka producer --- .../kafka/CommandSender.java | 2 +- .../kafka/PowerPlantCommandSender.java | 2 +- .../rest/PowerplantRestController.java | 7 +++-- .../kafka/PowerPlantListenerTest.java | 27 +++++-------------- 4 files changed, 12 insertions(+), 26 deletions(-) diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java index 3d4fe0c..c149b74 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java @@ -11,6 +11,6 @@ public class CommandSender implements PowerPlantCommandSender { private KafkaTemplate kafkaTemplate; @Override - public void sendCommand(Long powerPlantId, String command) { + public void sendCommand(String powerPlantId, String command) { } } diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java index 1e9c14f..49f9193 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java @@ -1,5 +1,5 @@ package kilowattcommando.controllerservice.kafka; public interface PowerPlantCommandSender { - void sendCommand(Long powerPlantId, String command); + void sendCommand(String powerPlantId, String command); } diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java index 35e765a..6fb41b4 100644 --- a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java +++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java @@ -1,6 +1,5 @@ package kilowattcommando.controllerservice.rest; -import jakarta.websocket.server.PathParam; import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,19 +12,19 @@ public class PowerplantRestController { private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class); @PutMapping("/start") - private ResponseEntity startPowerplant(@PathVariable Long id) { + private ResponseEntity startPowerplant(@PathVariable String id) { log.info("Request to start powerplant with id {}", id); return ResponseEntity.ok("Powerplant started"); } @PutMapping("/stop") - private ResponseEntity stopPowerplant(@PathVariable Long id) { + private ResponseEntity stopPowerplant(@PathVariable String id) { log.info("Request to stop powerplant with id {}", id); return ResponseEntity.ok("Powerplant stopped"); } @PutMapping("/gateClosure") - private ResponseEntity changeGateClosure(@PathVariable Long id, @RequestBody GateClosure gateClosure) { + private ResponseEntity changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) { log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id); if(gateClosure.validate()) { return ResponseEntity.ok("Changed gate closure"); diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index 805c3dd..cd337a8 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -1,24 +1,15 @@ package kilowattcommando.controllerservice.kafka; import dto.PowerplantStatus; -import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.internal.util.MockUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @@ -33,13 +24,9 @@ import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @SpringBootTest @@ -55,7 +42,7 @@ public class PowerPlantListenerTest { DockerImageName.parse("confluentinc/cp-kafka") ); @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTestTemplate; @Autowired private PowerPlantStatusHandler powerPlantStatusHandler; @@ -72,11 +59,11 @@ void shouldHandlePowerplantStatusEvent() { PowerplantStatus powerplantStatus = new PowerplantStatus(); powerplantStatus.name = "powerplant1"; - kafkaTemplate.send("powerplant", powerplantStatus); + kafkaTestTemplate.send("powerplant", powerplantStatus); await() .pollInterval(3, TimeUnit.SECONDS) - .atMost(10, TimeUnit.SECONDS) + .atMost(60, TimeUnit.SECONDS) .untilAsserted(() -> { assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount()); }); @@ -87,7 +74,7 @@ void shouldHandlePowerplantStatusEvent() { * Configuration of the KafkaProducer for testing purposes */ @TestConfiguration - public static class KafkaProducerConfig { + public static class KafkaProducerTestConfig { @Value("${kafka.serverPort}") private String kafkaServerPort; @@ -95,7 +82,7 @@ public static class KafkaProducerConfig { private String kafkaServerAddress; @Bean - public ProducerFactory producerFactory() { + public ProducerFactory producerTestFactory() { Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress + ":" + kafkaServerPort); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -104,8 +91,8 @@ public ProducerFactory producerFactory() { } @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); + public KafkaTemplate kafkaTestTemplate() { + return new KafkaTemplate<>(producerTestFactory()); } @Bean From d77f50a7b8430b98b79e6deb4cf394f0b58c966c Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 23:02:52 +0100 Subject: [PATCH 05/11] fix(test): Fixed tests of Kafka components --- .../controllerservice/kafka/CommandSenderTest.java | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java new file mode 100644 index 0000000..fb01127 --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java @@ -0,0 +1,2 @@ +package kilowattcommando.controllerservice.kafka;public class CommandSenderTest { +} From 8c0c2d3148ec32947ef3a3030f59f094374ae46c Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Sun, 5 Jan 2025 23:04:01 +0100 Subject: [PATCH 06/11] fix(kafka): Fixed Kafka components --- .../kafka/CommandSender.java | 14 ++- .../kafka/KafkaProducerConfig.java | 5 +- .../kafka/PowerPlantCommandSender.java | 6 +- src/main/resources/application.properties | 3 +- .../kafka/CommandSenderTest.java | 92 ++++++++++++++++++- .../kafka/PowerPlantListenerTest.java | 7 +- 6 files changed, 116 insertions(+), 11 deletions(-) diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java index c149b74..c1ebb7b 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java @@ -1,16 +1,26 @@ package kilowattcommando.controllerservice.kafka; import dto.PowerplantCommand; +import dto.PowerplantOperation; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class CommandSender implements PowerPlantCommandSender { + @Value("${kafka.controlTopic}") + private String powerPlantControlTopic; + @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTemplate; @Override - public void sendCommand(String powerPlantId, String command) { + public void sendCommand(String powerPlantId, PowerplantCommand command) { + PowerplantOperation powerplantOperation = new PowerplantOperation(command); + ProducerRecord commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation); + commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes()); + kafkaTemplate.send(commandRecord); } } diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java index ccae5d7..71e43a4 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java @@ -1,7 +1,5 @@ package kilowattcommando.controllerservice.kafka; -import com.fasterxml.jackson.databind.JsonSerializer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; @@ -11,6 +9,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @@ -34,7 +33,7 @@ public ProducerFactory producerFactory() { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(props); } diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java index 49f9193..8b301cd 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java @@ -1,5 +1,9 @@ package kilowattcommando.controllerservice.kafka; +import dto.PowerplantCommand; +import org.springframework.stereotype.Service; + +@Service public interface PowerPlantCommandSender { - void sendCommand(String powerPlantId, String command); + void sendCommand(String powerPlantId, PowerplantCommand command); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 67b04e3..34f0be6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,4 +2,5 @@ spring.application.name=controller-service kafka.serverAddress=${KAFKA_BROKER} kafka.serverPort=${KAFKA_BROKER_PORT} kafka.groupId=${KAFKA_GROUP_ID} -kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC} \ No newline at end of file +kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC} +kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC} \ No newline at end of file diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java index fb01127..4ec050e 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java @@ -1,2 +1,92 @@ -package kilowattcommando.controllerservice.kafka;public class CommandSenderTest { +package kilowattcommando.controllerservice.kafka; + +import dto.DTOJsonDeserializer; +import dto.PowerplantCommand; +import dto.PowerplantOperation; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.TestPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@Testcontainers +public class CommandSenderTest { + @Container + static final KafkaContainer KAFKA = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka") + ); + + private static AtomicInteger messageCount = new AtomicInteger(0); + + @Value("kafka.groupId") + private String groupId; + + @Autowired + private PowerPlantCommandSender powerPlantCommandSender; + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("kafka.serverAddress", KAFKA::getIpAddress); + registry.add("kafka.serverPort", KAFKA::getFirstMappedPort); + registry.add("kafka.controlTopic", () -> "powerplant"); + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + } + + private KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + @Test + void startPowerPlant_1() throws Exception { + powerPlantCommandSender.sendCommand("1", PowerplantCommand.start); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(1, records.count()); + ConsumerRecord record = records.iterator().next(); + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + } + ); + + } + } + } diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index cd337a8..e891836 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -4,6 +4,7 @@ import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -26,7 +27,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @SpringBootTest @@ -52,6 +52,7 @@ static void kafkaProperties(DynamicPropertyRegistry registry) { registry.add("kafka.serverAddress", KAFKA::getIpAddress); registry.add("kafka.serverPort", KAFKA::getFirstMappedPort); registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("kafka.controlTopic", () -> "powerplant"); } @Test @@ -59,13 +60,13 @@ void shouldHandlePowerplantStatusEvent() { PowerplantStatus powerplantStatus = new PowerplantStatus(); powerplantStatus.name = "powerplant1"; - kafkaTestTemplate.send("powerplant", powerplantStatus); + kafkaTestTemplate.send("backend", powerplantStatus); await() .pollInterval(3, TimeUnit.SECONDS) .atMost(60, TimeUnit.SECONDS) .untilAsserted(() -> { - assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount()); + Assertions.assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount()); }); } From f8c10772fc09d89bd7cfe422602bd5562078529b Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Mon, 6 Jan 2025 00:01:10 +0100 Subject: [PATCH 07/11] fix(test): Added mocking to PowerplantRestAPITest --- .../kafka/CommandSender.java | 5 +++ .../controllerservice/rest/GateClosure.java | 19 ++++++++ .../rest/PowerplantRestController.java | 14 +++++- .../rest/PowerplantRestAPITest.java | 45 ++++++++++++++++--- 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java index c1ebb7b..358ad63 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java @@ -3,6 +3,8 @@ import dto.PowerplantCommand; import dto.PowerplantOperation; import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; @@ -10,6 +12,8 @@ @Service public class CommandSender implements PowerPlantCommandSender { + private static final Logger log = LoggerFactory.getLogger(PowerPlantCommandSender.class); + @Value("${kafka.controlTopic}") private String powerPlantControlTopic; @@ -22,5 +26,6 @@ public void sendCommand(String powerPlantId, PowerplantCommand command) { ProducerRecord commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation); commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes()); kafkaTemplate.send(commandRecord); + log.info("Sent command {} to powerplant {}", command, powerPlantId); } } diff --git a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java index 156f737..fd86245 100644 --- a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java +++ b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java @@ -1,7 +1,26 @@ package kilowattcommando.controllerservice.rest; +import dto.PowerplantCommand; + public record GateClosure(String closure) { public boolean validate() { return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS"); } + + public PowerplantCommand getEquivalentPowerplantCommand() { + switch (closure) { + case "OPEN": + return PowerplantCommand.gateOpen; + case "CLOSED": + return PowerplantCommand.gateClose; + case "HALF": + return PowerplantCommand.gateHalf; + case "QUARTER": + return PowerplantCommand.gateQuarter; + case "THREE_QUARTERS": + return PowerplantCommand.gateThreeQuarters; + default: + return null; + } + } } diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java index 6fb41b4..8fd95c3 100644 --- a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java +++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java @@ -1,6 +1,7 @@ package kilowattcommando.controllerservice.rest; -import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; +import dto.PowerplantCommand; +import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; @@ -9,17 +10,25 @@ @RestController @RequestMapping("/powerplant/{id}") public class PowerplantRestController { - private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class); + private static final Logger log = LoggerFactory.getLogger(PowerplantRestController.class); + + private final PowerPlantCommandSender powerPlantCommandSender; + + public PowerplantRestController(PowerPlantCommandSender powerPlantCommandSender) { + this.powerPlantCommandSender = powerPlantCommandSender; + } @PutMapping("/start") private ResponseEntity startPowerplant(@PathVariable String id) { log.info("Request to start powerplant with id {}", id); + powerPlantCommandSender.sendCommand(id, PowerplantCommand.start); return ResponseEntity.ok("Powerplant started"); } @PutMapping("/stop") private ResponseEntity stopPowerplant(@PathVariable String id) { log.info("Request to stop powerplant with id {}", id); + powerPlantCommandSender.sendCommand(id, PowerplantCommand.stop); return ResponseEntity.ok("Powerplant stopped"); } @@ -27,6 +36,7 @@ private ResponseEntity stopPowerplant(@PathVariable String id) { private ResponseEntity changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) { log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id); if(gateClosure.validate()) { + powerPlantCommandSender.sendCommand(id, gateClosure.getEquivalentPowerplantCommand()); return ResponseEntity.ok("Changed gate closure"); } else { return ResponseEntity.badRequest().body("Invalid gate closure"); diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java index 9dee8bf..3bd2f0d 100644 --- a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java +++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java @@ -1,17 +1,18 @@ package kilowattcommando.controllerservice.rest; -import kilowattcommando.controllerservice.ControllerServiceApplication; -import org.junit.jupiter.api.BeforeAll; +import dto.PowerplantCommand; +import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; -import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; -import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import static org.mockito.Mockito.*; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -20,43 +21,75 @@ public class PowerplantRestAPITest { @Autowired private MockMvc mockMvc; + @Autowired + private PowerPlantCommandSender powerPlantCommandSender; + + @TestConfiguration + static class PowerplantRestTestConfiguration { + @Bean + public PowerPlantCommandSender powerPlantCommandSender() { + return mock(PowerPlantCommandSender.class); + } + } + + @BeforeEach + public void reset() { + Mockito.reset(powerPlantCommandSender); + } + @Test public void startPowerplantWithId_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.start); } @Test public void stopPowerplantWithId_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/stop")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.stop); } @Test public void closeGateOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateClose); } @Test public void openGateOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"OPEN\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateOpen); } @Test public void setGateToQuarterOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"QUARTER\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateQuarter); } @Test public void setGateToHalfOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"HALF\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateHalf); } @Test public void setGateToThreeQuartersOnPlant_1_OK() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THREE_QUARTERS\"}")).andExpect(status().isOk()); + + verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateThreeQuarters); } @Test public void invalidGateClosureOnPlant_1_400() throws Exception { mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400)); + + verify(powerPlantCommandSender, times(0)).sendCommand(anyString(), any()); } } From 9648f15668b89a019bad491d09d39504ad87879b Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:19:35 +0100 Subject: [PATCH 08/11] test: Add integration tests --- .../CommandIntegrationTest.java | 112 ++++++++++++++++++ .../kafka/CommandSenderTest.java | 28 +---- .../kafka/KafkaContainerExtension.java | 29 +++++ .../kafka/PowerPlantListenerTest.java | 47 ++++---- 4 files changed, 172 insertions(+), 44 deletions(-) create mode 100644 src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java create mode 100644 src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java diff --git a/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java new file mode 100644 index 0000000..fe58bed --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java @@ -0,0 +1,112 @@ +package kilowattcommando.controllerservice; + +import dto.DTOJsonDeserializer; +import dto.PowerplantCommand; +import dto.PowerplantOperation; +import kilowattcommando.controllerservice.kafka.KafkaContainerExtension; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@ExtendWith(KafkaContainerExtension.class) +@DirtiesContext +@AutoConfigureMockMvc +public class CommandIntegrationTest { + + + @Autowired + private MockMvc mockMvc; + + @Value("kafka.groupId") + private String groupId; + + private KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); + } + + @Test + void startPowerPlant_1() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(1, records.count()); + ConsumerRecord record = records.iterator().next(); + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + } + ); + + } + } + + @Test + void startPowerPlant_1_and_setGateToHalf() throws Exception { + mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk()); + mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\": \"HALF\"}")).andExpect(status().isOk()); + try(KafkaConsumer consumer = createConsumer()) { + consumer.subscribe(List.of("powerplant")); + + await() + .atMost(60, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(2)); + + assertEquals(2, records.count()); + + Iterator> iterator = records.iterator(); + ConsumerRecord record = iterator.next(); + ConsumerRecord record2 = iterator.next(); + + assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.start, record.value().command); + + assertEquals("1", new String(record2.headers().lastHeader("targetConsumerId").value())); + assertEquals(PowerplantCommand.gateHalf, record2.value().command); + } + ); + + } + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java index 4ec050e..fae173c 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java @@ -9,22 +9,17 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -34,14 +29,9 @@ properties = { "spring.application.name=controller-service" }) -@Testcontainers +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) public class CommandSenderTest { - @Container - static final KafkaContainer KAFKA = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka") - ); - - private static AtomicInteger messageCount = new AtomicInteger(0); @Value("kafka.groupId") private String groupId; @@ -49,17 +39,9 @@ public class CommandSenderTest { @Autowired private PowerPlantCommandSender powerPlantCommandSender; - @DynamicPropertySource - static void kafkaProperties(DynamicPropertyRegistry registry) { - registry.add("kafka.serverAddress", KAFKA::getIpAddress); - registry.add("kafka.serverPort", KAFKA::getFirstMappedPort); - registry.add("kafka.controlTopic", () -> "powerplant"); - registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); - } - private KafkaConsumer createConsumer() { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java new file mode 100644 index 0000000..8ab613f --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java @@ -0,0 +1,29 @@ +package kilowattcommando.controllerservice.kafka; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +public class KafkaContainerExtension implements BeforeAllCallback, AfterAllCallback { + protected static KafkaContainer KAFKA; + + @Override + public void beforeAll(ExtensionContext extensionContext) throws Exception { + KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")); + + KAFKA.start(); + + System.setProperty("kafka.serverAddress", KAFKA.getIpAddress()); + System.setProperty("kafka.serverPort", String.valueOf(KAFKA.getFirstMappedPort())); + System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers()); + System.setProperty("kafka.powerplant.topic", "backend"); + System.setProperty("kafka.controlTopic", "powerplant"); + } + + @Override + public void afterAll(ExtensionContext extensionContext) throws Exception { + // This is done by testcontainers + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index e891836..058130b 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -2,10 +2,16 @@ import dto.PowerplantStatus; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; @@ -15,16 +21,13 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.util.HashMap; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @@ -34,25 +37,20 @@ properties = { "spring.application.name=controller-service" }) -@Testcontainers +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) public class PowerPlantListenerTest { - @Container - static final KafkaContainer KAFKA = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka") - ); - @Autowired - private KafkaTemplate kafkaTestTemplate; - + private static final Logger log = LoggerFactory.getLogger(PowerPlantListenerTest.class); @Autowired private PowerPlantStatusHandler powerPlantStatusHandler; - @DynamicPropertySource - static void kafkaProperties(DynamicPropertyRegistry registry) { - registry.add("kafka.serverAddress", KAFKA::getIpAddress); - registry.add("kafka.serverPort", KAFKA::getFirstMappedPort); - registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); - registry.add("kafka.controlTopic", () -> "powerplant"); + private KafkaProducer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers")); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + return new KafkaProducer<>(props); } @Test @@ -60,7 +58,14 @@ void shouldHandlePowerplantStatusEvent() { PowerplantStatus powerplantStatus = new PowerplantStatus(); powerplantStatus.name = "powerplant1"; - kafkaTestTemplate.send("backend", powerplantStatus); + KafkaProducer producer = createProducer(); + + ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus); + + log.info("Sent record: {}", record); + producer.send(record); + + log.info("Sent record: {}", record); await() .pollInterval(3, TimeUnit.SECONDS) From e9feb946755a1f7735183a9c26d1ca95ae4472ed Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:48:51 +0100 Subject: [PATCH 09/11] fix(test): Fixed erratic test --- .../kafka/PowerPlantListenerTest.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index 058130b..c3bf779 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -2,6 +2,7 @@ import dto.PowerplantStatus; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -25,11 +26,14 @@ import org.springframework.test.context.TestPropertySource; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.fail; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @SpringBootTest @@ -54,18 +58,18 @@ private KafkaProducer createProducer() { } @Test - void shouldHandlePowerplantStatusEvent() { + void shouldHandlePowerplantStatusEvent() throws InterruptedException { PowerplantStatus powerplantStatus = new PowerplantStatus(); powerplantStatus.name = "powerplant1"; KafkaProducer producer = createProducer(); + ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus); - log.info("Sent record: {}", record); + Thread.sleep(500); producer.send(record); - - log.info("Sent record: {}", record); + Thread.sleep(500); await() .pollInterval(3, TimeUnit.SECONDS) @@ -87,20 +91,6 @@ public static class KafkaProducerTestConfig { @Value("${kafka.serverAddress}") private String kafkaServerAddress; - @Bean - public ProducerFactory producerTestFactory() { - Map configs = new HashMap<>(); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress + ":" + kafkaServerPort); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - public KafkaTemplate kafkaTestTemplate() { - return new KafkaTemplate<>(producerTestFactory()); - } - @Bean public PowerPlantStatusHandler powerPlantStatusHandler() { return new PowerPlantHandlerStub(); From cf861e76313462c08a519ae9fb6e4e71c5aaf1fb Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:32:07 +0100 Subject: [PATCH 10/11] ci(docker): Update Dockerfile --- Dockerfile | 3 +++ README.md | 2 ++ 2 files changed, 5 insertions(+) diff --git a/Dockerfile b/Dockerfile index 4c3032c..3f44983 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,9 @@ FROM openjdk:17-slim COPY target/controller-service-0.0.1-SNAPSHOT.jar controller.jar ENV KAFKA_POWERPLANT_TOPIC=backend +ENV KAFKA_POWERPLANT_CONTROL_TOPIC=powerplant ENV KAFKA_GROUP_ID=controller +EXPOSE 8080 + ENTRYPOINT ["java", "-jar", "/controller.jar"] \ No newline at end of file diff --git a/README.md b/README.md index 6130833..21d1b77 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The application requires the following environment variables: | KAFKA_BROKER | IP address of Kafka Broker | | KAFKA_BROKER_PORT | Port number of Kafka Broker | |KAFKA_GROUP_ID| Group id of controller service (default in Docker container: controller) | +|KAFKA_POWERPLANT_CONTROL_TOPIC| Topic for powerplant commands (default in Docker container: powerplant | |KAFKA_POWERPLANT_TOPIC| Topic of powerplant status updates (default in Docker container: backend) | ## Run using Docker @@ -27,6 +28,7 @@ Step 2: Execute the Docker container with required ENV variables: ```shell docker run \ + -p 8080:8080\ -e KAFKA_BROKER= \ -e KAFKA_BROKER_PORT= \ kwkmdo-controller From a1ee80471ae29971b119cd63130171f22099ac54 Mon Sep 17 00:00:00 2001 From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:41:29 +0100 Subject: [PATCH 11/11] fix(test): Increase delay for PowerPlantListenerTest --- .../controllerservice/kafka/PowerPlantListenerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index c3bf779..aff20cb 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -67,9 +67,8 @@ void shouldHandlePowerplantStatusEvent() throws InterruptedException { ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus); - Thread.sleep(500); + Thread.sleep(3000); // Wait for topic metadata to catch up producer.send(record); - Thread.sleep(500); await() .pollInterval(3, TimeUnit.SECONDS)