diff --git a/pom.xml b/pom.xml index 8eff247..09fa0f6 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,19 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-jpa + + + com.h2database + h2 + runtime + + + org.springframework.boot + spring-boot-starter-websocket + diff --git a/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java new file mode 100644 index 0000000..03ba0ec --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivity.java @@ -0,0 +1,23 @@ +package kilowattcommando.controllerservice.data; + +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.NamedQuery; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.sql.Timestamp; + +@Entity +@Data +@EqualsAndHashCode +@NoArgsConstructor +@AllArgsConstructor +@NamedQuery(name="PowerPlantActivity.findMissingPowerPlants", query="select p from PowerPlantActivity p where p.lastActivity < TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP)") +public class PowerPlantActivity { + @Id + private String name; + private Timestamp lastActivity; +} diff --git a/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java new file mode 100644 index 0000000..9fc06e2 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepository.java @@ -0,0 +1,11 @@ +package kilowattcommando.controllerservice.data; + +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.util.Streamable; +import org.springframework.stereotype.Repository; + +@Repository +public interface PowerPlantActivityRepository extends CrudRepository { + Streamable findMissingPowerPlants(); +} diff --git a/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java new file mode 100644 index 0000000..9eee212 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandler.java @@ -0,0 +1,9 @@ +package kilowattcommando.controllerservice.handlers; + +import dto.PowerplantStatus; +import org.springframework.stereotype.Service; + +@Service +public interface PowerPlantActivityHandler { + void handle(PowerplantStatus status); +} diff --git a/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java new file mode 100644 index 0000000..30ed89d --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/handlers/PowerPlantActivityHandlerImpl.java @@ -0,0 +1,29 @@ +package kilowattcommando.controllerservice.handlers; + +import dto.PowerplantStatus; +import kilowattcommando.controllerservice.data.PowerPlantActivity; +import kilowattcommando.controllerservice.data.PowerPlantActivityRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.sql.Timestamp; +import java.time.Instant; + +@Service +public class PowerPlantActivityHandlerImpl implements PowerPlantActivityHandler{ + private final PowerPlantActivityRepository powerPlantActivityRepository; + + private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityHandlerImpl.class); + + public PowerPlantActivityHandlerImpl(PowerPlantActivityRepository powerPlantActivityRepository) { + this.powerPlantActivityRepository = powerPlantActivityRepository; + } + + @Override + public void handle(PowerplantStatus status) { + PowerPlantActivity powerPlantActivity = new PowerPlantActivity(status.name, Timestamp.from(Instant.now())); + log.info("Received PowerPlantActivity: {}", powerPlantActivity.getName()); + powerPlantActivityRepository.save(powerPlantActivity); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java index 2acacf2..26ad615 100644 --- a/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java +++ b/src/main/java/kilowattcommando/controllerservice/kafka/PowerPlantListener.java @@ -1,6 +1,7 @@ package kilowattcommando.controllerservice.kafka; import dto.PowerplantStatus; +import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; @@ -11,15 +12,18 @@ public class PowerPlantListener { private final PowerPlantStatusHandler powerPlantStatusHandler; + private final PowerPlantActivityHandler powerPlantActivityHandler; - public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler) { + public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler, PowerPlantActivityHandler powerPlantActivityHandler) { this.powerPlantStatusHandler = powerPlantStatusHandler; + this.powerPlantActivityHandler = powerPlantActivityHandler; } @KafkaListener(topics = "${kafka.powerplant.topic}", groupId = "${kafka.groupId}") public void listen(ConsumerRecord message) { if(message.value() instanceof PowerplantStatus powerplantStatus) { powerPlantStatusHandler.handle(powerplantStatus); + powerPlantActivityHandler.handle(powerplantStatus); } } } diff --git a/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java b/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java new file mode 100644 index 0000000..819d670 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/monitor/PowerPlantActivityMonitor.java @@ -0,0 +1,41 @@ +package kilowattcommando.controllerservice.monitor; + +import kilowattcommando.controllerservice.data.PowerPlantActivity; +import kilowattcommando.controllerservice.data.PowerPlantActivityRepository; +import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler; +import kilowattcommando.controllerservice.push.PushNotificationController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Streamable; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +@Component +public class PowerPlantActivityMonitor { + private PowerPlantActivityRepository powerPlantActivityRepository; + private PushNotificationController pushNotificationController; + + private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityMonitor.class); + + public PowerPlantActivityMonitor(PowerPlantActivityRepository powerPlantActivityRepository, PushNotificationController pushNotificationController) { + this.powerPlantActivityRepository = powerPlantActivityRepository; + this.pushNotificationController = pushNotificationController; + } + + @Scheduled(fixedDelay = 120_000L) + public void checkForMissingPowerPlants() { + Streamable missingPowerplants = this.powerPlantActivityRepository.findMissingPowerPlants(); + missingPowerplants.forEach(powerPlantActivity -> { + log.error("Missing powerplant detected: {}", powerPlantActivity.getName()); + pushNotificationController.sendMessage(String.format("Connection with powerplant %s is lost.", powerPlantActivity.getName())); + }); + if(missingPowerplants.isEmpty()) { + log.info("No missing powerplant detected"); + pushNotificationController.sendMessage("No missing powerplants"); + } + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java b/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java new file mode 100644 index 0000000..fccd1dd --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/monitor/SchedulingConfiguration.java @@ -0,0 +1,9 @@ +package kilowattcommando.controllerservice.monitor; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; + +@Configuration +@EnableScheduling +public class SchedulingConfiguration { +} diff --git a/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java b/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java new file mode 100644 index 0000000..1cc12b7 --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/push/PushNotificationController.java @@ -0,0 +1,21 @@ +package kilowattcommando.controllerservice.push; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Controller; + +@Controller +public class PushNotificationController { + private SimpMessagingTemplate messagingTemplate; + + public PushNotificationController(SimpMessagingTemplate messagingTemplate) { + this.messagingTemplate = messagingTemplate; + } + + public void sendMessage(final String message) { + this.messagingTemplate.convertAndSend("/powerplant", message); + } +} diff --git a/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java b/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java new file mode 100644 index 0000000..d4a67aa --- /dev/null +++ b/src/main/java/kilowattcommando/controllerservice/push/WebSocketConfig.java @@ -0,0 +1,24 @@ +package kilowattcommando.controllerservice.push; + +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; +import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; + +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/message-broker"); + registry.addEndpoint("/message-broker").withSockJS(); + } + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableSimpleBroker("/powerplant"); + registry.setApplicationDestinationPrefixes("/app"); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 34f0be6..a06742c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -3,4 +3,9 @@ kafka.serverAddress=${KAFKA_BROKER} kafka.serverPort=${KAFKA_BROKER_PORT} kafka.groupId=${KAFKA_GROUP_ID} kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC} -kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC} \ No newline at end of file +kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC} +spring.datasource.url=jdbc:h2:mem:powerplant_activity_db +spring.datasource.driverClassName=org.h2.Driver +spring.datasource.username=controller +spring.datasource.password=${CONTROLLER_DB_PASSWORD} +spring.jpa.database-platform=org.hibernate.dialect.H2Dialect \ No newline at end of file diff --git a/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java new file mode 100644 index 0000000..b6a5c4c --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityRepositoryTest.java @@ -0,0 +1,65 @@ +package kilowattcommando.controllerservice.data; + +import kilowattcommando.controllerservice.kafka.KafkaContainerExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Iterator; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) +public class PowerPlantActivityRepositoryTest { + @Autowired + private PowerPlantActivityRepository powerPlantActivityRepository; + + @BeforeEach + void resetRepository() { + powerPlantActivityRepository.deleteAll(); + } + + @Test + void noMissingPowerPlants() { + PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.now())); + + powerPlantActivityRepository.save(powerPlantActivity); + + assertFalse(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext()); + } + + @Test + void singleMissingPowerPlant() { + PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000))); + + powerPlantActivityRepository.save(powerPlantActivity); + + assertTrue(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext()); + } + + @Test + void multipleMissingPowerPlant() { + PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000))); + PowerPlantActivity powerPlantActivity2 = new PowerPlantActivity("test2", Timestamp.from(Instant.ofEpochSecond(100000))); + + powerPlantActivityRepository.save(powerPlantActivity); + powerPlantActivityRepository.save(powerPlantActivity2); + + Stream stream = powerPlantActivityRepository.findMissingPowerPlants().stream(); + + assertTrue(stream.allMatch(x -> x.equals(powerPlantActivity) || x.equals(powerPlantActivity2))); + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java new file mode 100644 index 0000000..241257f --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/data/PowerPlantActivityTest.java @@ -0,0 +1,100 @@ +package kilowattcommando.controllerservice.data; + +import dto.PowerplantOperation; +import dto.PowerplantStatus; +import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler; +import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandlerImpl; +import kilowattcommando.controllerservice.kafka.CommandSender; +import kilowattcommando.controllerservice.kafka.KafkaConsumerConfig; +import kilowattcommando.controllerservice.kafka.KafkaContainerExtension; +import kilowattcommando.controllerservice.kafka.KafkaProducerConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; + +import java.sql.Timestamp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) +public class PowerPlantActivityTest { + @Autowired + private PowerPlantActivityHandler powerPlantActivityHandlerImpl; + + @Autowired + private PowerPlantActivityRepository powerPlantActivityRepository; + + @BeforeEach + void resetRepository() { + powerPlantActivityRepository.deleteAll(); + } + + @Test + void handlePowerPlantActivity() { + PowerplantStatus powerplantStatus = new PowerplantStatus(); + powerplantStatus.name = "test"; + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + + assertEquals(1, powerPlantActivityRepository.count()); + } + + @Test + void handlePowerPlantActivityUpdateToNewTimestamp() { + PowerplantStatus powerplantStatus = new PowerplantStatus(); + powerplantStatus.name = "test"; + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + + Timestamp firstActivity = powerPlantActivityRepository.findById("test").get().getLastActivity(); + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + + Timestamp lastActivity = powerPlantActivityRepository.findById("test").get().getLastActivity(); + + assertTrue(lastActivity.after(firstActivity)); + } + + @Test + void handlePowerPlantActivityMultiplePlants() { + PowerplantStatus powerplantStatus = new PowerplantStatus(); + powerplantStatus.name = "test1"; + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + Timestamp firstActivity = powerPlantActivityRepository.findById("test1").get().getLastActivity(); + + powerplantStatus.name = "test2"; + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + Timestamp secondActivity = powerPlantActivityRepository.findById("test2").get().getLastActivity(); + + assertEquals(2, powerPlantActivityRepository.count()); + assertTrue(secondActivity.after(firstActivity)); + } + + @Test + void handlePowerPlantActivityUpdateCountUnchanged() { + PowerplantStatus powerplantStatus = new PowerplantStatus(); + powerplantStatus.name = "test"; + + powerPlantActivityHandlerImpl.handle(powerplantStatus); + powerPlantActivityHandlerImpl.handle(powerplantStatus); + + assertEquals(1, powerPlantActivityRepository.count()); + } +} diff --git a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java index aff20cb..52ede38 100644 --- a/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java +++ b/src/test/java/kilowattcommando/controllerservice/kafka/PowerPlantListenerTest.java @@ -1,6 +1,7 @@ package kilowattcommando.controllerservice.kafka; import dto.PowerplantStatus; +import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler; import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.producer.KafkaProducer; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @SpringBootTest @@ -94,5 +96,8 @@ public static class KafkaProducerTestConfig { public PowerPlantStatusHandler powerPlantStatusHandler() { return new PowerPlantHandlerStub(); } + + @Bean + public PowerPlantActivityHandler powerPlantActivityHandler() {return mock(PowerPlantActivityHandler.class);} } } diff --git a/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java b/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java new file mode 100644 index 0000000..13efc2d --- /dev/null +++ b/src/test/java/kilowattcommando/controllerservice/push/PushNotificationControllerTest.java @@ -0,0 +1,43 @@ +package kilowattcommando.controllerservice.push; + +import kilowattcommando.controllerservice.kafka.KafkaContainerExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@SpringBootTest +@TestPropertySource( + properties = { + "spring.application.name=controller-service" + }) +@DirtiesContext +@ExtendWith(KafkaContainerExtension.class) +public class PushNotificationControllerTest { + private static PushNotificationController pushNotificationController; + + private static SimpMessagingTemplate simpMessagingTemplate; + + @BeforeAll + static void createPushNotificationController() { + simpMessagingTemplate = mock(SimpMessagingTemplate.class); + pushNotificationController = new PushNotificationController(simpMessagingTemplate); + } + + @Test + void testPushNotification() { + pushNotificationController.sendMessage("No missing powerplants"); + + verify(simpMessagingTemplate).convertAndSend("/powerplant", "No missing powerplants"); + } +}