Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kilowattcommando.controllerservice.data;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.NamedQuery;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.sql.Timestamp;

@Entity
@Data
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
@NamedQuery(name="PowerPlantActivity.findMissingPowerPlants", query="select p from PowerPlantActivity p where p.lastActivity < TIMESTAMPADD(MINUTE, -2, CURRENT_TIMESTAMP)")
public class PowerPlantActivity {
@Id
private String name;
private Timestamp lastActivity;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package kilowattcommando.controllerservice.data;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.util.Streamable;
import org.springframework.stereotype.Repository;

@Repository
public interface PowerPlantActivityRepository extends CrudRepository<PowerPlantActivity, String> {
Streamable<PowerPlantActivity> findMissingPowerPlants();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kilowattcommando.controllerservice.handlers;

import dto.PowerplantStatus;
import org.springframework.stereotype.Service;

@Service
public interface PowerPlantActivityHandler {
void handle(PowerplantStatus status);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kilowattcommando.controllerservice.handlers;

import dto.PowerplantStatus;
import kilowattcommando.controllerservice.data.PowerPlantActivity;
import kilowattcommando.controllerservice.data.PowerPlantActivityRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.sql.Timestamp;
import java.time.Instant;

@Service
public class PowerPlantActivityHandlerImpl implements PowerPlantActivityHandler{
private final PowerPlantActivityRepository powerPlantActivityRepository;

private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityHandlerImpl.class);

public PowerPlantActivityHandlerImpl(PowerPlantActivityRepository powerPlantActivityRepository) {
this.powerPlantActivityRepository = powerPlantActivityRepository;
}

@Override
public void handle(PowerplantStatus status) {
PowerPlantActivity powerPlantActivity = new PowerPlantActivity(status.name, Timestamp.from(Instant.now()));
log.info("Received PowerPlantActivity: {}", powerPlantActivity.getName());
powerPlantActivityRepository.save(powerPlantActivity);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kilowattcommando.controllerservice.kafka;

import dto.PowerplantStatus;
import kilowattcommando.controllerservice.handlers.PowerPlantActivityHandler;
import kilowattcommando.controllerservice.handlers.PowerPlantStatusHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -11,15 +12,18 @@
public class PowerPlantListener {

private final PowerPlantStatusHandler powerPlantStatusHandler;
private final PowerPlantActivityHandler powerPlantActivityHandler;

public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler) {
public PowerPlantListener(PowerPlantStatusHandler powerPlantStatusHandler, PowerPlantActivityHandler powerPlantActivityHandler) {
this.powerPlantStatusHandler = powerPlantStatusHandler;
this.powerPlantActivityHandler = powerPlantActivityHandler;
}

@KafkaListener(topics = "${kafka.powerplant.topic}", groupId = "${kafka.groupId}")
public void listen(ConsumerRecord<String, ?> message) {
if(message.value() instanceof PowerplantStatus powerplantStatus) {
powerPlantStatusHandler.handle(powerplantStatus);
powerPlantActivityHandler.handle(powerplantStatus);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kilowattcommando.controllerservice.monitor;

import kilowattcommando.controllerservice.data.PowerPlantActivity;
import kilowattcommando.controllerservice.data.PowerPlantActivityRepository;
import kilowattcommando.controllerservice.handlers.PowerPlantLoggingHandler;
import kilowattcommando.controllerservice.push.PushNotificationController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Streamable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
public class PowerPlantActivityMonitor {
private PowerPlantActivityRepository powerPlantActivityRepository;
private PushNotificationController pushNotificationController;

private static final Logger log = LoggerFactory.getLogger(PowerPlantActivityMonitor.class);

public PowerPlantActivityMonitor(PowerPlantActivityRepository powerPlantActivityRepository, PushNotificationController pushNotificationController) {
this.powerPlantActivityRepository = powerPlantActivityRepository;
this.pushNotificationController = pushNotificationController;
}

@Scheduled(fixedDelay = 120_000L)
public void checkForMissingPowerPlants() {
Streamable<PowerPlantActivity> missingPowerplants = this.powerPlantActivityRepository.findMissingPowerPlants();
missingPowerplants.forEach(powerPlantActivity -> {
log.error("Missing powerplant detected: {}", powerPlantActivity.getName());
pushNotificationController.sendMessage(String.format("Connection with powerplant %s is lost.", powerPlantActivity.getName()));
});
if(missingPowerplants.isEmpty()) {
log.info("No missing powerplant detected");
pushNotificationController.sendMessage("No missing powerplants");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kilowattcommando.controllerservice.monitor;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class SchedulingConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kilowattcommando.controllerservice.push;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

@Controller
public class PushNotificationController {
private SimpMessagingTemplate messagingTemplate;

public PushNotificationController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}

public void sendMessage(final String message) {
this.messagingTemplate.convertAndSend("/powerplant", message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kilowattcommando.controllerservice.push;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/message-broker");
registry.addEndpoint("/message-broker").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/powerplant");
registry.setApplicationDestinationPrefixes("/app");
}
}
7 changes: 6 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ kafka.serverAddress=${KAFKA_BROKER}
kafka.serverPort=${KAFKA_BROKER_PORT}
kafka.groupId=${KAFKA_GROUP_ID}
kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
spring.datasource.url=jdbc:h2:mem:powerplant_activity_db
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=controller
spring.datasource.password=${CONTROLLER_DB_PASSWORD}
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kilowattcommando.controllerservice.data;

import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.Iterator;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
@TestPropertySource(
properties = {
"spring.application.name=controller-service"
})
@DirtiesContext
@ExtendWith(KafkaContainerExtension.class)
public class PowerPlantActivityRepositoryTest {
@Autowired
private PowerPlantActivityRepository powerPlantActivityRepository;

@BeforeEach
void resetRepository() {
powerPlantActivityRepository.deleteAll();
}

@Test
void noMissingPowerPlants() {
PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.now()));

powerPlantActivityRepository.save(powerPlantActivity);

assertFalse(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext());
}

@Test
void singleMissingPowerPlant() {
PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000)));

powerPlantActivityRepository.save(powerPlantActivity);

assertTrue(powerPlantActivityRepository.findMissingPowerPlants().iterator().hasNext());
}

@Test
void multipleMissingPowerPlant() {
PowerPlantActivity powerPlantActivity = new PowerPlantActivity("test", Timestamp.from(Instant.ofEpochSecond(10000)));
PowerPlantActivity powerPlantActivity2 = new PowerPlantActivity("test2", Timestamp.from(Instant.ofEpochSecond(100000)));

powerPlantActivityRepository.save(powerPlantActivity);
powerPlantActivityRepository.save(powerPlantActivity2);

Stream<PowerPlantActivity> stream = powerPlantActivityRepository.findMissingPowerPlants().stream();

assertTrue(stream.allMatch(x -> x.equals(powerPlantActivity) || x.equals(powerPlantActivity2)));
}
}
Loading
Loading