From c22c7fdb3ca3d56243bef09b38deec7617958771 Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 13:49:46 +0100
Subject: [PATCH 01/11] feat(rest-api): Add REST endpoints for Powerplant
commands
Created REST Controller with mapping /powerplant/{id} and the following endpoints:
+ /start PUT starts the powerplant with id id
+ /stop PUT stops the powerplant with id id
+ /gateClosure PUT sets the gate's closure of the powerplant with id id to either OPEN, CLOSED, HALF, QUARTER, THREE_QUARTERS
---
pom.xml | 4 ++
.../controllerservice/rest/GateClosure.java | 7 ++++
.../rest/PowerplantRestController.java | 37 ++++++++++++++++
.../rest/PowerplantRestAPITest.java | 42 +++++++++++++++++++
4 files changed, 90 insertions(+)
create mode 100644 src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
create mode 100644 src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
create mode 100644 src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
diff --git a/pom.xml b/pom.xml
index 8996835..9648182 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,10 @@
mockito-core
5.12.0
+
+ org.springframework.boot
+ spring-boot-starter-web
+
diff --git a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
new file mode 100644
index 0000000..156f737
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
@@ -0,0 +1,7 @@
+package kilowattcommando.controllerservice.rest;
+
+public record GateClosure(String closure) {
+ public boolean validate() {
+ return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS");
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
new file mode 100644
index 0000000..35e765a
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
@@ -0,0 +1,37 @@
+package kilowattcommando.controllerservice.rest;
+
+import jakarta.websocket.server.PathParam;
+import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+@RestController
+@RequestMapping("/powerplant/{id}")
+public class PowerplantRestController {
+ private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class);
+
+ @PutMapping("/start")
+ private ResponseEntity startPowerplant(@PathVariable Long id) {
+ log.info("Request to start powerplant with id {}", id);
+ return ResponseEntity.ok("Powerplant started");
+ }
+
+ @PutMapping("/stop")
+ private ResponseEntity stopPowerplant(@PathVariable Long id) {
+ log.info("Request to stop powerplant with id {}", id);
+ return ResponseEntity.ok("Powerplant stopped");
+ }
+
+ @PutMapping("/gateClosure")
+ private ResponseEntity changeGateClosure(@PathVariable Long id, @RequestBody GateClosure gateClosure) {
+ log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id);
+ if(gateClosure.validate()) {
+ return ResponseEntity.ok("Changed gate closure");
+ } else {
+ return ResponseEntity.badRequest().body("Invalid gate closure");
+ }
+
+ }
+}
diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
new file mode 100644
index 0000000..184a457
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
@@ -0,0 +1,42 @@
+package kilowattcommando.controllerservice.rest;
+
+import kilowattcommando.controllerservice.ControllerServiceApplication;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@WebMvcTest(PowerplantRestController.class)
+public class PowerplantRestAPITest {
+ @Autowired
+ private MockMvc mockMvc;
+
+ @Test
+ public void startPowerplantWithId_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void stopPowerplantWithId_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/stop")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void closeGateOnPlant_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void invalidGateClosureOnPlant_1_400() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400));
+ }
+}
From 6459da23a39d51adbc630961a2389f78ae5c1c9f Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 14:04:06 +0100
Subject: [PATCH 02/11] test(rest-api): Add tests for all closure values
---
.../rest/PowerplantRestAPITest.java | 20 +++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
index 184a457..9dee8bf 100644
--- a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
+++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
@@ -35,6 +35,26 @@ public void closeGateOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk());
}
+ @Test
+ public void openGateOnPlant_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"OPEN\"}")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void setGateToQuarterOnPlant_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"QUARTER\"}")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void setGateToHalfOnPlant_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"HALF\"}")).andExpect(status().isOk());
+ }
+
+ @Test
+ public void setGateToThreeQuartersOnPlant_1_OK() throws Exception {
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THREE_QUARTERS\"}")).andExpect(status().isOk());
+ }
+
@Test
public void invalidGateClosureOnPlant_1_400() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400));
From 736e4aa1f7d5081507d80c29fa008d980fc63523 Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 17:13:15 +0100
Subject: [PATCH 03/11] feat(kafka): Create producer beans
---
.../kafka/CommandSender.java | 16 +++++++
.../kafka/KafkaProducerConfig.java | 46 +++++++++++++++++++
.../kafka/PowerPlantCommandSender.java | 5 ++
3 files changed, 67 insertions(+)
create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
create mode 100644 src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
new file mode 100644
index 0000000..3d4fe0c
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
@@ -0,0 +1,16 @@
+package kilowattcommando.controllerservice.kafka;
+
+import dto.PowerplantCommand;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+public class CommandSender implements PowerPlantCommandSender {
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Override
+ public void sendCommand(Long powerPlantId, String command) {
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
new file mode 100644
index 0000000..ccae5d7
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
@@ -0,0 +1,46 @@
+package kilowattcommando.controllerservice.kafka;
+
+import com.fasterxml.jackson.databind.JsonSerializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@EnableKafka
+@Configuration
+public class KafkaProducerConfig {
+ @Value("${kafka.serverAddress}")
+ private String kafkaServerAddress;
+
+ @Value("${kafka.serverPort}")
+ private String kafkaServerPort;
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map props = new HashMap<>();
+ props.put(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ String.format("%s:%s", kafkaServerAddress, kafkaServerPort));
+ props.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class);
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
new file mode 100644
index 0000000..1e9c14f
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
@@ -0,0 +1,5 @@
+package kilowattcommando.controllerservice.kafka;
+
+public interface PowerPlantCommandSender {
+ void sendCommand(Long powerPlantId, String command);
+}
From 1fcb910a57636f6cbfd647c373c537ff89f38e7e Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 20:25:02 +0100
Subject: [PATCH 04/11] fix(test): Change test configuration for Kafka producer
---
.../kafka/CommandSender.java | 2 +-
.../kafka/PowerPlantCommandSender.java | 2 +-
.../rest/PowerplantRestController.java | 7 +++--
.../kafka/PowerPlantListenerTest.java | 27 +++++--------------
4 files changed, 12 insertions(+), 26 deletions(-)
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
index 3d4fe0c..c149b74 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
@@ -11,6 +11,6 @@ public class CommandSender implements PowerPlantCommandSender {
private KafkaTemplate kafkaTemplate;
@Override
- public void sendCommand(Long powerPlantId, String command) {
+ public void sendCommand(String powerPlantId, String command) {
}
}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
index 1e9c14f..49f9193 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
@@ -1,5 +1,5 @@
package kilowattcommando.controllerservice.kafka;
public interface PowerPlantCommandSender {
- void sendCommand(Long powerPlantId, String command);
+ void sendCommand(String powerPlantId, String command);
}
diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
index 35e765a..6fb41b4 100644
--- a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
+++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
@@ -1,6 +1,5 @@
package kilowattcommando.controllerservice.rest;
-import jakarta.websocket.server.PathParam;
import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,19 +12,19 @@ public class PowerplantRestController {
private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class);
@PutMapping("/start")
- private ResponseEntity startPowerplant(@PathVariable Long id) {
+ private ResponseEntity startPowerplant(@PathVariable String id) {
log.info("Request to start powerplant with id {}", id);
return ResponseEntity.ok("Powerplant started");
}
@PutMapping("/stop")
- private ResponseEntity stopPowerplant(@PathVariable Long id) {
+ private ResponseEntity stopPowerplant(@PathVariable String id) {
log.info("Request to stop powerplant with id {}", id);
return ResponseEntity.ok("Powerplant stopped");
}
@PutMapping("/gateClosure")
- private ResponseEntity changeGateClosure(@PathVariable Long id, @RequestBody GateClosure gateClosure) {
+ private ResponseEntity changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) {
log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id);
if(gateClosure.validate()) {
return ResponseEntity.ok("Changed gate closure");
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index 805c3dd..cd337a8 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -1,24 +1,15 @@
package kilowattcommando.controllerservice.kafka;
import dto.PowerplantStatus;
-import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.internal.util.MockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@@ -33,13 +24,9 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@SpringBootTest
@@ -55,7 +42,7 @@ public class PowerPlantListenerTest {
DockerImageName.parse("confluentinc/cp-kafka")
);
@Autowired
- private KafkaTemplate kafkaTemplate;
+ private KafkaTemplate kafkaTestTemplate;
@Autowired
private PowerPlantStatusHandler powerPlantStatusHandler;
@@ -72,11 +59,11 @@ void shouldHandlePowerplantStatusEvent() {
PowerplantStatus powerplantStatus = new PowerplantStatus();
powerplantStatus.name = "powerplant1";
- kafkaTemplate.send("powerplant", powerplantStatus);
+ kafkaTestTemplate.send("powerplant", powerplantStatus);
await()
.pollInterval(3, TimeUnit.SECONDS)
- .atMost(10, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount());
});
@@ -87,7 +74,7 @@ void shouldHandlePowerplantStatusEvent() {
* Configuration of the KafkaProducer for testing purposes
*/
@TestConfiguration
- public static class KafkaProducerConfig {
+ public static class KafkaProducerTestConfig {
@Value("${kafka.serverPort}")
private String kafkaServerPort;
@@ -95,7 +82,7 @@ public static class KafkaProducerConfig {
private String kafkaServerAddress;
@Bean
- public ProducerFactory producerFactory() {
+ public ProducerFactory producerTestFactory() {
Map configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress + ":" + kafkaServerPort);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -104,8 +91,8 @@ public ProducerFactory producerFactory() {
}
@Bean
- public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
+ public KafkaTemplate kafkaTestTemplate() {
+ return new KafkaTemplate<>(producerTestFactory());
}
@Bean
From d77f50a7b8430b98b79e6deb4cf394f0b58c966c Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 23:02:52 +0100
Subject: [PATCH 05/11] fix(test): Fixed tests of Kafka components
---
.../controllerservice/kafka/CommandSenderTest.java | 2 ++
1 file changed, 2 insertions(+)
create mode 100644 src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
new file mode 100644
index 0000000..fb01127
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
@@ -0,0 +1,2 @@
+package kilowattcommando.controllerservice.kafka;public class CommandSenderTest {
+}
From 8c0c2d3148ec32947ef3a3030f59f094374ae46c Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Sun, 5 Jan 2025 23:04:01 +0100
Subject: [PATCH 06/11] fix(kafka): Fixed Kafka components
---
.../kafka/CommandSender.java | 14 ++-
.../kafka/KafkaProducerConfig.java | 5 +-
.../kafka/PowerPlantCommandSender.java | 6 +-
src/main/resources/application.properties | 3 +-
.../kafka/CommandSenderTest.java | 92 ++++++++++++++++++-
.../kafka/PowerPlantListenerTest.java | 7 +-
6 files changed, 116 insertions(+), 11 deletions(-)
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
index c149b74..c1ebb7b 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
@@ -1,16 +1,26 @@
package kilowattcommando.controllerservice.kafka;
import dto.PowerplantCommand;
+import dto.PowerplantOperation;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class CommandSender implements PowerPlantCommandSender {
+ @Value("${kafka.controlTopic}")
+ private String powerPlantControlTopic;
+
@Autowired
- private KafkaTemplate kafkaTemplate;
+ private KafkaTemplate kafkaTemplate;
@Override
- public void sendCommand(String powerPlantId, String command) {
+ public void sendCommand(String powerPlantId, PowerplantCommand command) {
+ PowerplantOperation powerplantOperation = new PowerplantOperation(command);
+ ProducerRecord commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation);
+ commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes());
+ kafkaTemplate.send(commandRecord);
}
}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
index ccae5d7..71e43a4 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/KafkaProducerConfig.java
@@ -1,7 +1,5 @@
package kilowattcommando.controllerservice.kafka;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
@@ -11,6 +9,7 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@@ -34,7 +33,7 @@ public ProducerFactory producerFactory() {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
index 49f9193..8b301cd 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantCommandSender.java
@@ -1,5 +1,9 @@
package kilowattcommando.controllerservice.kafka;
+import dto.PowerplantCommand;
+import org.springframework.stereotype.Service;
+
+@Service
public interface PowerPlantCommandSender {
- void sendCommand(String powerPlantId, String command);
+ void sendCommand(String powerPlantId, PowerplantCommand command);
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 67b04e3..34f0be6 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -2,4 +2,5 @@ spring.application.name=controller-service
kafka.serverAddress=${KAFKA_BROKER}
kafka.serverPort=${KAFKA_BROKER_PORT}
kafka.groupId=${KAFKA_GROUP_ID}
-kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
\ No newline at end of file
+kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
+kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
\ No newline at end of file
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
index fb01127..4ec050e 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
@@ -1,2 +1,92 @@
-package kilowattcommando.controllerservice.kafka;public class CommandSenderTest {
+package kilowattcommando.controllerservice.kafka;
+
+import dto.DTOJsonDeserializer;
+import dto.PowerplantCommand;
+import dto.PowerplantOperation;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.TestPropertySource;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@SpringBootTest
+@TestPropertySource(
+ properties = {
+ "spring.application.name=controller-service"
+ })
+@Testcontainers
+public class CommandSenderTest {
+ @Container
+ static final KafkaContainer KAFKA = new KafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka")
+ );
+
+ private static AtomicInteger messageCount = new AtomicInteger(0);
+
+ @Value("kafka.groupId")
+ private String groupId;
+
+ @Autowired
+ private PowerPlantCommandSender powerPlantCommandSender;
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("kafka.serverAddress", KAFKA::getIpAddress);
+ registry.add("kafka.serverPort", KAFKA::getFirstMappedPort);
+ registry.add("kafka.controlTopic", () -> "powerplant");
+ registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
+ }
+
+ private KafkaConsumer createConsumer() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new KafkaConsumer<>(props);
+ }
+
+ @Test
+ void startPowerPlant_1() throws Exception {
+ powerPlantCommandSender.sendCommand("1", PowerplantCommand.start);
+ try(KafkaConsumer consumer = createConsumer()) {
+ consumer.subscribe(List.of("powerplant"));
+
+ await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
+
+ assertEquals(1, records.count());
+ ConsumerRecord record = records.iterator().next();
+ assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
+ assertEquals(PowerplantCommand.start, record.value().command);
+ }
+ );
+
+ }
+ }
+
}
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index cd337a8..e891836 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -4,6 +4,7 @@
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -26,7 +27,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@SpringBootTest
@@ -52,6 +52,7 @@ static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("kafka.serverAddress", KAFKA::getIpAddress);
registry.add("kafka.serverPort", KAFKA::getFirstMappedPort);
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
+ registry.add("kafka.controlTopic", () -> "powerplant");
}
@Test
@@ -59,13 +60,13 @@ void shouldHandlePowerplantStatusEvent() {
PowerplantStatus powerplantStatus = new PowerplantStatus();
powerplantStatus.name = "powerplant1";
- kafkaTestTemplate.send("powerplant", powerplantStatus);
+ kafkaTestTemplate.send("backend", powerplantStatus);
await()
.pollInterval(3, TimeUnit.SECONDS)
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(() -> {
- assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount());
+ Assertions.assertEquals(1, ((PowerPlantHandlerStub) powerPlantStatusHandler).getMessageCount());
});
}
From f8c10772fc09d89bd7cfe422602bd5562078529b Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Mon, 6 Jan 2025 00:01:10 +0100
Subject: [PATCH 07/11] fix(test): Added mocking to PowerplantRestAPITest
---
.../kafka/CommandSender.java | 5 +++
.../controllerservice/rest/GateClosure.java | 19 ++++++++
.../rest/PowerplantRestController.java | 14 +++++-
.../rest/PowerplantRestAPITest.java | 45 ++++++++++++++++---
4 files changed, 75 insertions(+), 8 deletions(-)
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
index c1ebb7b..358ad63 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/CommandSender.java
@@ -3,6 +3,8 @@
import dto.PowerplantCommand;
import dto.PowerplantOperation;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
@@ -10,6 +12,8 @@
@Service
public class CommandSender implements PowerPlantCommandSender {
+ private static final Logger log = LoggerFactory.getLogger(PowerPlantCommandSender.class);
+
@Value("${kafka.controlTopic}")
private String powerPlantControlTopic;
@@ -22,5 +26,6 @@ public void sendCommand(String powerPlantId, PowerplantCommand command) {
ProducerRecord commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation);
commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes());
kafkaTemplate.send(commandRecord);
+ log.info("Sent command {} to powerplant {}", command, powerPlantId);
}
}
diff --git a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
index 156f737..fd86245 100644
--- a/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
+++ b/src/main/java/kilowattcommando/controllerservice/rest/GateClosure.java
@@ -1,7 +1,26 @@
package kilowattcommando.controllerservice.rest;
+import dto.PowerplantCommand;
+
public record GateClosure(String closure) {
public boolean validate() {
return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS");
}
+
+ public PowerplantCommand getEquivalentPowerplantCommand() {
+ switch (closure) {
+ case "OPEN":
+ return PowerplantCommand.gateOpen;
+ case "CLOSED":
+ return PowerplantCommand.gateClose;
+ case "HALF":
+ return PowerplantCommand.gateHalf;
+ case "QUARTER":
+ return PowerplantCommand.gateQuarter;
+ case "THREE_QUARTERS":
+ return PowerplantCommand.gateThreeQuarters;
+ default:
+ return null;
+ }
+ }
}
diff --git a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
index 6fb41b4..8fd95c3 100644
--- a/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
+++ b/src/main/java/kilowattcommando/controllerservice/rest/PowerplantRestController.java
@@ -1,6 +1,7 @@
package kilowattcommando.controllerservice.rest;
-import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
+import dto.PowerplantCommand;
+import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
@@ -9,17 +10,25 @@
@RestController
@RequestMapping("/powerplant/{id}")
public class PowerplantRestController {
- private static final Logger log = LoggerFactory.getLogger(PowerPlantLoggingHandler.class);
+ private static final Logger log = LoggerFactory.getLogger(PowerplantRestController.class);
+
+ private final PowerPlantCommandSender powerPlantCommandSender;
+
+ public PowerplantRestController(PowerPlantCommandSender powerPlantCommandSender) {
+ this.powerPlantCommandSender = powerPlantCommandSender;
+ }
@PutMapping("/start")
private ResponseEntity startPowerplant(@PathVariable String id) {
log.info("Request to start powerplant with id {}", id);
+ powerPlantCommandSender.sendCommand(id, PowerplantCommand.start);
return ResponseEntity.ok("Powerplant started");
}
@PutMapping("/stop")
private ResponseEntity stopPowerplant(@PathVariable String id) {
log.info("Request to stop powerplant with id {}", id);
+ powerPlantCommandSender.sendCommand(id, PowerplantCommand.stop);
return ResponseEntity.ok("Powerplant stopped");
}
@@ -27,6 +36,7 @@ private ResponseEntity stopPowerplant(@PathVariable String id) {
private ResponseEntity changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) {
log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id);
if(gateClosure.validate()) {
+ powerPlantCommandSender.sendCommand(id, gateClosure.getEquivalentPowerplantCommand());
return ResponseEntity.ok("Changed gate closure");
} else {
return ResponseEntity.badRequest().body("Invalid gate closure");
diff --git a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
index 9dee8bf..3bd2f0d 100644
--- a/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
+++ b/src/test/java/kilowattcommando/controllerservice/rest/PowerplantRestAPITest.java
@@ -1,17 +1,18 @@
package kilowattcommando.controllerservice.rest;
-import kilowattcommando.controllerservice.ControllerServiceApplication;
-import org.junit.jupiter.api.BeforeAll;
+import dto.PowerplantCommand;
+import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
-import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
-import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import static org.mockito.Mockito.*;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -20,43 +21,75 @@ public class PowerplantRestAPITest {
@Autowired
private MockMvc mockMvc;
+ @Autowired
+ private PowerPlantCommandSender powerPlantCommandSender;
+
+ @TestConfiguration
+ static class PowerplantRestTestConfiguration {
+ @Bean
+ public PowerPlantCommandSender powerPlantCommandSender() {
+ return mock(PowerPlantCommandSender.class);
+ }
+ }
+
+ @BeforeEach
+ public void reset() {
+ Mockito.reset(powerPlantCommandSender);
+ }
+
@Test
public void startPowerplantWithId_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.start);
}
@Test
public void stopPowerplantWithId_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/stop")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.stop);
}
@Test
public void closeGateOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"CLOSED\"}")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateClose);
}
@Test
public void openGateOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"OPEN\"}")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateOpen);
}
@Test
public void setGateToQuarterOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"QUARTER\"}")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateQuarter);
}
@Test
public void setGateToHalfOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"HALF\"}")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateHalf);
}
@Test
public void setGateToThreeQuartersOnPlant_1_OK() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THREE_QUARTERS\"}")).andExpect(status().isOk());
+
+ verify(powerPlantCommandSender, times(1)).sendCommand("1", PowerplantCommand.gateThreeQuarters);
}
@Test
public void invalidGateClosureOnPlant_1_400() throws Exception {
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\":\"THIRD\"}")).andExpect(status().is(400));
+
+ verify(powerPlantCommandSender, times(0)).sendCommand(anyString(), any());
}
}
From 9648f15668b89a019bad491d09d39504ad87879b Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Mon, 6 Jan 2025 16:19:35 +0100
Subject: [PATCH 08/11] test: Add integration tests
---
.../CommandIntegrationTest.java | 112 ++++++++++++++++++
.../kafka/CommandSenderTest.java | 28 +----
.../kafka/KafkaContainerExtension.java | 29 +++++
.../kafka/PowerPlantListenerTest.java | 47 ++++----
4 files changed, 172 insertions(+), 44 deletions(-)
create mode 100644 src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java
create mode 100644 src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java
diff --git a/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java
new file mode 100644
index 0000000..fe58bed
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/CommandIntegrationTest.java
@@ -0,0 +1,112 @@
+package kilowattcommando.controllerservice;
+
+import dto.DTOJsonDeserializer;
+import dto.PowerplantCommand;
+import dto.PowerplantOperation;
+import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.web.servlet.MockMvc;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@SpringBootTest
+@TestPropertySource(
+ properties = {
+ "spring.application.name=controller-service"
+ })
+@ExtendWith(KafkaContainerExtension.class)
+@DirtiesContext
+@AutoConfigureMockMvc
+public class CommandIntegrationTest {
+
+
+ @Autowired
+ private MockMvc mockMvc;
+
+ @Value("kafka.groupId")
+ private String groupId;
+
+ private KafkaConsumer createConsumer() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new KafkaConsumer<>(props);
+ }
+
+ @Test
+ void startPowerPlant_1() throws Exception {
+ mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
+ try(KafkaConsumer consumer = createConsumer()) {
+ consumer.subscribe(List.of("powerplant"));
+
+ await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
+
+ assertEquals(1, records.count());
+ ConsumerRecord record = records.iterator().next();
+ assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
+ assertEquals(PowerplantCommand.start, record.value().command);
+ }
+ );
+
+ }
+ }
+
+ @Test
+ void startPowerPlant_1_and_setGateToHalf() throws Exception {
+ mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
+ mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\": \"HALF\"}")).andExpect(status().isOk());
+ try(KafkaConsumer consumer = createConsumer()) {
+ consumer.subscribe(List.of("powerplant"));
+
+ await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
+
+ assertEquals(2, records.count());
+
+ Iterator> iterator = records.iterator();
+ ConsumerRecord record = iterator.next();
+ ConsumerRecord record2 = iterator.next();
+
+ assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
+ assertEquals(PowerplantCommand.start, record.value().command);
+
+ assertEquals("1", new String(record2.headers().lastHeader("targetConsumerId").value()));
+ assertEquals(PowerplantCommand.gateHalf, record2.value().command);
+ }
+ );
+
+ }
+ }
+}
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
index 4ec050e..fae173c 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/CommandSenderTest.java
@@ -9,22 +9,17 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.DynamicPropertyRegistry;
-import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,14 +29,9 @@
properties = {
"spring.application.name=controller-service"
})
-@Testcontainers
+@DirtiesContext
+@ExtendWith(KafkaContainerExtension.class)
public class CommandSenderTest {
- @Container
- static final KafkaContainer KAFKA = new KafkaContainer(
- DockerImageName.parse("confluentinc/cp-kafka")
- );
-
- private static AtomicInteger messageCount = new AtomicInteger(0);
@Value("kafka.groupId")
private String groupId;
@@ -49,17 +39,9 @@ public class CommandSenderTest {
@Autowired
private PowerPlantCommandSender powerPlantCommandSender;
- @DynamicPropertySource
- static void kafkaProperties(DynamicPropertyRegistry registry) {
- registry.add("kafka.serverAddress", KAFKA::getIpAddress);
- registry.add("kafka.serverPort", KAFKA::getFirstMappedPort);
- registry.add("kafka.controlTopic", () -> "powerplant");
- registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
- }
-
private KafkaConsumer createConsumer() {
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java
new file mode 100644
index 0000000..8ab613f
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/KafkaContainerExtension.java
@@ -0,0 +1,29 @@
+package kilowattcommando.controllerservice.kafka;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaContainerExtension implements BeforeAllCallback, AfterAllCallback {
+ protected static KafkaContainer KAFKA;
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) throws Exception {
+ KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"));
+
+ KAFKA.start();
+
+ System.setProperty("kafka.serverAddress", KAFKA.getIpAddress());
+ System.setProperty("kafka.serverPort", String.valueOf(KAFKA.getFirstMappedPort()));
+ System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
+ System.setProperty("kafka.powerplant.topic", "backend");
+ System.setProperty("kafka.controlTopic", "powerplant");
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) throws Exception {
+ // This is done by testcontainers
+ }
+}
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index e891836..058130b 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -2,10 +2,16 @@
import dto.PowerplantStatus;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
@@ -15,16 +21,13 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.test.context.DynamicPropertyRegistry;
-import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@@ -34,25 +37,20 @@
properties = {
"spring.application.name=controller-service"
})
-@Testcontainers
+@DirtiesContext
+@ExtendWith(KafkaContainerExtension.class)
public class PowerPlantListenerTest {
- @Container
- static final KafkaContainer KAFKA = new KafkaContainer(
- DockerImageName.parse("confluentinc/cp-kafka")
- );
- @Autowired
- private KafkaTemplate kafkaTestTemplate;
-
+ private static final Logger log = LoggerFactory.getLogger(PowerPlantListenerTest.class);
@Autowired
private PowerPlantStatusHandler powerPlantStatusHandler;
- @DynamicPropertySource
- static void kafkaProperties(DynamicPropertyRegistry registry) {
- registry.add("kafka.serverAddress", KAFKA::getIpAddress);
- registry.add("kafka.serverPort", KAFKA::getFirstMappedPort);
- registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
- registry.add("kafka.controlTopic", () -> "powerplant");
+ private KafkaProducer createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
+ return new KafkaProducer<>(props);
}
@Test
@@ -60,7 +58,14 @@ void shouldHandlePowerplantStatusEvent() {
PowerplantStatus powerplantStatus = new PowerplantStatus();
powerplantStatus.name = "powerplant1";
- kafkaTestTemplate.send("backend", powerplantStatus);
+ KafkaProducer producer = createProducer();
+
+ ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus);
+
+ log.info("Sent record: {}", record);
+ producer.send(record);
+
+ log.info("Sent record: {}", record);
await()
.pollInterval(3, TimeUnit.SECONDS)
From e9feb946755a1f7735183a9c26d1ca95ae4472ed Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Mon, 6 Jan 2025 16:48:51 +0100
Subject: [PATCH 09/11] fix(test): Fixed erratic test
---
.../kafka/PowerPlantListenerTest.java | 26 ++++++-------------
1 file changed, 8 insertions(+), 18 deletions(-)
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index 058130b..c3bf779 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -2,6 +2,7 @@
import dto.PowerplantStatus;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
+import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -25,11 +26,14 @@
import org.springframework.test.context.TestPropertySource;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@SpringBootTest
@@ -54,18 +58,18 @@ private KafkaProducer createProducer() {
}
@Test
- void shouldHandlePowerplantStatusEvent() {
+ void shouldHandlePowerplantStatusEvent() throws InterruptedException {
PowerplantStatus powerplantStatus = new PowerplantStatus();
powerplantStatus.name = "powerplant1";
KafkaProducer producer = createProducer();
+
ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus);
- log.info("Sent record: {}", record);
+ Thread.sleep(500);
producer.send(record);
-
- log.info("Sent record: {}", record);
+ Thread.sleep(500);
await()
.pollInterval(3, TimeUnit.SECONDS)
@@ -87,20 +91,6 @@ public static class KafkaProducerTestConfig {
@Value("${kafka.serverAddress}")
private String kafkaServerAddress;
- @Bean
- public ProducerFactory producerTestFactory() {
- Map configs = new HashMap<>();
- configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress + ":" + kafkaServerPort);
- configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
- return new DefaultKafkaProducerFactory<>(configs);
- }
-
- @Bean
- public KafkaTemplate kafkaTestTemplate() {
- return new KafkaTemplate<>(producerTestFactory());
- }
-
@Bean
public PowerPlantStatusHandler powerPlantStatusHandler() {
return new PowerPlantHandlerStub();
From cf861e76313462c08a519ae9fb6e4e71c5aaf1fb Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Mon, 6 Jan 2025 19:32:07 +0100
Subject: [PATCH 10/11] ci(docker): Update Dockerfile
---
Dockerfile | 3 +++
README.md | 2 ++
2 files changed, 5 insertions(+)
diff --git a/Dockerfile b/Dockerfile
index 4c3032c..3f44983 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -3,6 +3,9 @@ FROM openjdk:17-slim
COPY target/controller-service-0.0.1-SNAPSHOT.jar controller.jar
ENV KAFKA_POWERPLANT_TOPIC=backend
+ENV KAFKA_POWERPLANT_CONTROL_TOPIC=powerplant
ENV KAFKA_GROUP_ID=controller
+EXPOSE 8080
+
ENTRYPOINT ["java", "-jar", "/controller.jar"]
\ No newline at end of file
diff --git a/README.md b/README.md
index 6130833..21d1b77 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,7 @@ The application requires the following environment variables:
| KAFKA_BROKER | IP address of Kafka Broker |
| KAFKA_BROKER_PORT | Port number of Kafka Broker |
|KAFKA_GROUP_ID| Group id of controller service (default in Docker container: controller) |
+|KAFKA_POWERPLANT_CONTROL_TOPIC| Topic for powerplant commands (default in Docker container: powerplant |
|KAFKA_POWERPLANT_TOPIC| Topic of powerplant status updates (default in Docker container: backend) |
## Run using Docker
@@ -27,6 +28,7 @@ Step 2:
Execute the Docker container with required ENV variables:
```shell
docker run \
+ -p 8080:8080\
-e KAFKA_BROKER= \
-e KAFKA_BROKER_PORT= \
kwkmdo-controller
From a1ee80471ae29971b119cd63130171f22099ac54 Mon Sep 17 00:00:00 2001
From: Clemens Bauer <38780028+Clemi2806@users.noreply.github.com>
Date: Mon, 6 Jan 2025 19:41:29 +0100
Subject: [PATCH 11/11] fix(test): Increase delay for PowerPlantListenerTest
---
.../controllerservice/kafka/PowerPlantListenerTest.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index c3bf779..aff20cb 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -67,9 +67,8 @@ void shouldHandlePowerplantStatusEvent() throws InterruptedException {
ProducerRecord record = new ProducerRecord<>("backend", powerplantStatus);
- Thread.sleep(500);
+ Thread.sleep(3000); // Wait for topic metadata to catch up
producer.send(record);
- Thread.sleep(500);
await()
.pollInterval(3, TimeUnit.SECONDS)