diff --git a/pom.xml b/pom.xml
index 8eff247..09fa0f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,19 @@
org.springframework.boot
spring-boot-starter-web
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+
+
+ com.h2database
+ h2
+ runtime
+
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+
diff --git a/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java
new file mode 100644
index 0000000..03ba0ec
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java
@@ -0,0 +1,23 @@
+package kilowattcommando.controllerservice.data;
+
+import jakarta.persistence.Entity;
+import jakarta.persistence.Id;
+import jakarta.persistence.NamedQuery;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+import java.sql.Timestamp;
+
+@Entity
+@Data
+@EqualsAndHashCode
+@NoArgsConstructor
+@AllArgsConstructor
+@NamedQuery(name="PowerPlantActivity.findMissingPowerPlants", query="select p from PowerPlantActivity p where p.lastActivity < TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP)")
+public class PowerPlantActivity {
+ @Id
+ private String name;
+ private Timestamp lastActivity;
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java
new file mode 100644
index 0000000..9fc06e2
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java
@@ -0,0 +1,11 @@
+package kilowattcommando.controllerservice.data;
+
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.util.Streamable;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface PowerPlantActivityRepository extends CrudRepository {
+ Streamable findMissingPowerPlants();
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java
new file mode 100644
index 0000000..9eee212
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java
@@ -0,0 +1,9 @@
+package kilowattcommando.controllerservice.handlers;
+
+import dto.PowerplantStatus;
+import org.springframework.stereotype.Service;
+
+@Service
+public interface PowerPlantActivityHandler {
+ void handle(PowerplantStatus status);
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java
new file mode 100644
index 0000000..30ed89d
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java
@@ -0,0 +1,29 @@
+package kilowattcommando.controllerservice.handlers;
+
+import dto.PowerplantStatus;
+import kilowattcommando.controllerservice.data.PowerPlantActivity;
+import kilowattcommando.controllerservice.data.PowerPlantActivityRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+
+@Service
+public class PowerPlantActivityHandlerImpl implements PowerPlantActivityHandler{
+ private final PowerPlantActivityRepository powerPlantActivityRepository;
+
+ private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityHandlerImpl.class);
+
+ public PowerPlantActivityHandlerImpl(PowerPlantActivityRepository powerPlantActivityRepository) {
+ this.powerPlantActivityRepository = powerPlantActivityRepository;
+ }
+
+ @Override
+ public void handle(PowerplantStatus status) {
+ PowerPlantActivity powerPlantActivity = new PowerPlantActivity(status.name, Timestamp.from(Instant.now()));
+ log.info("Received PowerPlantActivity: {}", powerPlantActivity.getName());
+ powerPlantActivityRepository.save(powerPlantActivity);
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java
index 2acacf2..26ad615 100644
--- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java
+++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java
@@ -1,6 +1,7 @@
package kilowattcommando.controllerservice.kafka;
import dto.PowerplantStatus;
+import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
@@ -11,15 +12,18 @@
public class PowerPlantListener {
private final PowerPlantStatusHandler powerPlantStatusHandler;
+ private final PowerPlantActivityHandler powerPlantActivityHandler;
- public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler) {
+ public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler, PowerPlantActivityHandler powerPlantActivityHandler) {
this.powerPlantStatusHandler = powerPlantStatusHandler;
+ this.powerPlantActivityHandler = powerPlantActivityHandler;
}
@KafkaListener(topics = "${kafka.powerplant.topic}", groupId = "${kafka.groupId}")
public void listen(ConsumerRecord message) {
if(message.value() instanceof PowerplantStatus powerplantStatus) {
powerPlantStatusHandler.handle(powerplantStatus);
+ powerPlantActivityHandler.handle(powerplantStatus);
}
}
}
diff --git a/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java b/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java
new file mode 100644
index 0000000..819d670
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java
@@ -0,0 +1,41 @@
+package kilowattcommando.controllerservice.monitor;
+
+import kilowattcommando.controllerservice.data.PowerPlantActivity;
+import kilowattcommando.controllerservice.data.PowerPlantActivityRepository;
+import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
+import kilowattcommando.controllerservice.push.PushNotificationController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.util.Streamable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+@Component
+public class PowerPlantActivityMonitor {
+ private PowerPlantActivityRepository powerPlantActivityRepository;
+ private PushNotificationController pushNotificationController;
+
+ private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityMonitor.class);
+
+ public PowerPlantActivityMonitor(PowerPlantActivityRepository powerPlantActivityRepository, PushNotificationController pushNotificationController) {
+ this.powerPlantActivityRepository = powerPlantActivityRepository;
+ this.pushNotificationController = pushNotificationController;
+ }
+
+ @Scheduled(fixedDelay = 120_000L)
+ public void checkForMissingPowerPlants() {
+ Streamable missingPowerplants = this.powerPlantActivityRepository.findMissingPowerPlants();
+ missingPowerplants.forEach(powerPlantActivity -> {
+ log.error("Missing powerplant detected: {}", powerPlantActivity.getName());
+ pushNotificationController.sendMessage(String.format("Connection with powerplant %s is lost.", powerPlantActivity.getName()));
+ });
+ if(missingPowerplants.isEmpty()) {
+ log.info("No missing powerplant detected");
+ pushNotificationController.sendMessage("No missing powerplants");
+ }
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java b/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java
new file mode 100644
index 0000000..fccd1dd
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java
@@ -0,0 +1,9 @@
+package kilowattcommando.controllerservice.monitor;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@Configuration
+@EnableScheduling
+public class SchedulingConfiguration {
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java b/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java
new file mode 100644
index 0000000..1cc12b7
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java
@@ -0,0 +1,21 @@
+package kilowattcommando.controllerservice.push;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.messaging.handler.annotation.SendTo;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+import org.springframework.stereotype.Controller;
+
+@Controller
+public class PushNotificationController {
+ private SimpMessagingTemplate messagingTemplate;
+
+ public PushNotificationController(SimpMessagingTemplate messagingTemplate) {
+ this.messagingTemplate = messagingTemplate;
+ }
+
+ public void sendMessage(final String message) {
+ this.messagingTemplate.convertAndSend("/powerplant", message);
+ }
+}
diff --git a/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java b/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java
new file mode 100644
index 0000000..d4a67aa
--- /dev/null
+++ b/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java
@@ -0,0 +1,24 @@
+package kilowattcommando.controllerservice.push;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+
+@Configuration
+@EnableWebSocketMessageBroker
+public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+ @Override
+ public void registerStompEndpoints(StompEndpointRegistry registry) {
+ registry.addEndpoint("/message-broker");
+ registry.addEndpoint("/message-broker").withSockJS();
+ }
+
+ @Override
+ public void configureMessageBroker(MessageBrokerRegistry registry) {
+ registry.enableSimpleBroker("/powerplant");
+ registry.setApplicationDestinationPrefixes("/app");
+ }
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 34f0be6..a06742c 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -3,4 +3,9 @@ kafka.serverAddress=${KAFKA_BROKER}
kafka.serverPort=${KAFKA_BROKER_PORT}
kafka.groupId=${KAFKA_GROUP_ID}
kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
-kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
\ No newline at end of file
+kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
+spring.datasource.url=jdbc:h2:mem:powerplant_activity_db
+spring.datasource.driverClassName=org.h2.Driver
+spring.datasource.username=controller
+spring.datasource.password=${CONTROLLER_DB_PASSWORD}
+spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
\ No newline at end of file
diff --git a/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java
new file mode 100644
index 0000000..b6a5c4c
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java
@@ -0,0 +1,65 @@
+package kilowattcommando.controllerservice.data;
+
+import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest
+@TestPropertySource(
+ properties = {
+ "spring.application.name=controller-service"
+ })
+@DirtiesContext
+@ExtendWith(KafkaContainerExtension.class)
+public class PowerPlantActivityRepositoryTest {
+ @Autowired
+ private PowerPlantActivityRepository powerPlantActivityRepository;
+
+ @BeforeEach
+ void resetRepository() {
+ powerPlantActivityRepository.deleteAll();
+ }
+
+ @Test
+ void noMissingPowerPlants() {
+ PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.now()));
+
+ powerPlantActivityRepository.save(powerPlantActivity);
+
+ assertFalse(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext());
+ }
+
+ @Test
+ void singleMissingPowerPlant() {
+ PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000)));
+
+ powerPlantActivityRepository.save(powerPlantActivity);
+
+ assertTrue(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext());
+ }
+
+ @Test
+ void multipleMissingPowerPlant() {
+ PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000)));
+ PowerPlantActivity powerPlantActivity2 = new PowerPlantActivity("test2", Timestamp.from(Instant.ofEpochSecond(100000)));
+
+ powerPlantActivityRepository.save(powerPlantActivity);
+ powerPlantActivityRepository.save(powerPlantActivity2);
+
+ Stream stream = powerPlantActivityRepository.findMissingPowerPlants().stream();
+
+ assertTrue(stream.allMatch(x -> x.equals(powerPlantActivity) || x.equals(powerPlantActivity2)));
+ }
+}
diff --git a/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java
new file mode 100644
index 0000000..241257f
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java
@@ -0,0 +1,100 @@
+package kilowattcommando.controllerservice.data;
+
+import dto.PowerplantOperation;
+import dto.PowerplantStatus;
+import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler;
+import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandlerImpl;
+import kilowattcommando.controllerservice.kafka.CommandSender;
+import kilowattcommando.controllerservice.kafka.KafkaConsumerConfig;
+import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
+import kilowattcommando.controllerservice.kafka.KafkaProducerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
+
+import java.sql.Timestamp;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@SpringBootTest
+@TestPropertySource(
+ properties = {
+ "spring.application.name=controller-service"
+ })
+@DirtiesContext
+@ExtendWith(KafkaContainerExtension.class)
+public class PowerPlantActivityTest {
+ @Autowired
+ private PowerPlantActivityHandler powerPlantActivityHandlerImpl;
+
+ @Autowired
+ private PowerPlantActivityRepository powerPlantActivityRepository;
+
+ @BeforeEach
+ void resetRepository() {
+ powerPlantActivityRepository.deleteAll();
+ }
+
+ @Test
+ void handlePowerPlantActivity() {
+ PowerplantStatus powerplantStatus = new PowerplantStatus();
+ powerplantStatus.name = "test";
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+
+ assertEquals(1, powerPlantActivityRepository.count());
+ }
+
+ @Test
+ void handlePowerPlantActivityUpdateToNewTimestamp() {
+ PowerplantStatus powerplantStatus = new PowerplantStatus();
+ powerplantStatus.name = "test";
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+
+ Timestamp firstActivity = powerPlantActivityRepository.findById("test").get().getLastActivity();
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+
+ Timestamp lastActivity = powerPlantActivityRepository.findById("test").get().getLastActivity();
+
+ assertTrue(lastActivity.after(firstActivity));
+ }
+
+ @Test
+ void handlePowerPlantActivityMultiplePlants() {
+ PowerplantStatus powerplantStatus = new PowerplantStatus();
+ powerplantStatus.name = "test1";
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+ Timestamp firstActivity = powerPlantActivityRepository.findById("test1").get().getLastActivity();
+
+ powerplantStatus.name = "test2";
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+ Timestamp secondActivity = powerPlantActivityRepository.findById("test2").get().getLastActivity();
+
+ assertEquals(2, powerPlantActivityRepository.count());
+ assertTrue(secondActivity.after(firstActivity));
+ }
+
+ @Test
+ void handlePowerPlantActivityUpdateCountUnchanged() {
+ PowerplantStatus powerplantStatus = new PowerplantStatus();
+ powerplantStatus.name = "test";
+
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+ powerPlantActivityHandlerImpl.handle(powerplantStatus);
+
+ assertEquals(1, powerPlantActivityRepository.count());
+ }
+}
diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
index aff20cb..52ede38 100644
--- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
+++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java
@@ -1,6 +1,7 @@
package kilowattcommando.controllerservice.kafka;
import dto.PowerplantStatus;
+import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@SpringBootTest
@@ -94,5 +96,8 @@ public static class KafkaProducerTestConfig {
public PowerPlantStatusHandler powerPlantStatusHandler() {
return new PowerPlantHandlerStub();
}
+
+ @Bean
+ public PowerPlantActivityHandler powerPlantActivityHandler() {return mock(PowerPlantActivityHandler.class);}
}
}
diff --git a/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java b/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java
new file mode 100644
index 0000000..13efc2d
--- /dev/null
+++ b/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java
@@ -0,0 +1,43 @@
+package kilowattcommando.controllerservice.push;
+
+import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@SpringBootTest
+@TestPropertySource(
+ properties = {
+ "spring.application.name=controller-service"
+ })
+@DirtiesContext
+@ExtendWith(KafkaContainerExtension.class)
+public class PushNotificationControllerTest {
+ private static PushNotificationController pushNotificationController;
+
+ private static SimpMessagingTemplate simpMessagingTemplate;
+
+ @BeforeAll
+ static void createPushNotificationController() {
+ simpMessagingTemplate = mock(SimpMessagingTemplate.class);
+ pushNotificationController = new PushNotificationController(simpMessagingTemplate);
+ }
+
+ @Test
+ void testPushNotification() {
+ pushNotificationController.sendMessage("No missing powerplants");
+
+ verify(simpMessagingTemplate).convertAndSend("/powerplant", "No missing powerplants");
+ }
+}