Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ FROM openjdk:17-slim
COPY target/controller-service-0.0.1-SNAPSHOT.jar controller.jar

ENV KAFKA_POWERPLANT_TOPIC=backend
ENV KAFKA_POWERPLANT_CONTROL_TOPIC=powerplant
ENV KAFKA_GROUP_ID=controller

EXPOSE 8080

ENTRYPOINT ["java", "-jar", "/controller.jar"]
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The application requires the following environment variables:
| KAFKA_BROKER | IP address of Kafka Broker |
| KAFKA_BROKER_PORT | Port number of Kafka Broker |
|KAFKA_GROUP_ID| Group id of controller service (default in Docker container: controller) |
|KAFKA_POWERPLANT_CONTROL_TOPIC| Topic for powerplant commands (default in Docker container: powerplant |
|KAFKA_POWERPLANT_TOPIC| Topic of powerplant status updates (default in Docker container: backend) |

## Run using Docker
Expand All @@ -27,6 +28,7 @@ Step 2:
Execute the Docker container with required ENV variables:
```shell
docker run \
-p 8080:8080\
-e KAFKA_BROKER=<broker_ip_address> \
-e KAFKA_BROKER_PORT=<broker_port> \
kwkmdo-controller
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
<artifactId>mockito-core</artifactId>
<version>5.12.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package kilowattcommando.controllerservice.kafka;

import dto.PowerplantCommand;
import dto.PowerplantOperation;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class CommandSender implements PowerPlantCommandSender {
private static final Logger log = LoggerFactory.getLogger(PowerPlantCommandSender.class);

@Value("${kafka.controlTopic}")
private String powerPlantControlTopic;

@Autowired
private KafkaTemplate<String, PowerplantOperation> kafkaTemplate;

@Override
public void sendCommand(String powerPlantId, PowerplantCommand command) {
PowerplantOperation powerplantOperation = new PowerplantOperation(command);
ProducerRecord<String, PowerplantOperation> commandRecord = new ProducerRecord<>(powerPlantControlTopic, powerplantOperation);
commandRecord.headers().add("targetConsumerId", powerPlantId.getBytes());
kafkaTemplate.send(commandRecord);
log.info("Sent command {} to powerplant {}", command, powerPlantId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kilowattcommando.controllerservice.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.serverAddress}")
private String kafkaServerAddress;

@Value("${kafka.serverPort}")
private String kafkaServerPort;

@Bean
public <T> ProducerFactory<String, T> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.format("%s:%s", kafkaServerAddress, kafkaServerPort));
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public <T> KafkaTemplate<String, T> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kilowattcommando.controllerservice.kafka;

import dto.PowerplantCommand;
import org.springframework.stereotype.Service;

@Service
public interface PowerPlantCommandSender {
void sendCommand(String powerPlantId, PowerplantCommand command);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kilowattcommando.controllerservice.rest;

import dto.PowerplantCommand;

public record GateClosure(String closure) {
public boolean validate() {
return closure.matches("OPEN|CLOSED|HALF|QUARTER|THREE_QUARTERS");
}

public PowerplantCommand getEquivalentPowerplantCommand() {
switch (closure) {
case "OPEN":
return PowerplantCommand.gateOpen;
case "CLOSED":
return PowerplantCommand.gateClose;
case "HALF":
return PowerplantCommand.gateHalf;
case "QUARTER":
return PowerplantCommand.gateQuarter;
case "THREE_QUARTERS":
return PowerplantCommand.gateThreeQuarters;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kilowattcommando.controllerservice.rest;

import dto.PowerplantCommand;
import kilowattcommando.controllerservice.kafka.PowerPlantCommandSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/powerplant/{id}")
public class PowerplantRestController {
private static final Logger log = LoggerFactory.getLogger(PowerplantRestController.class);

private final PowerPlantCommandSender powerPlantCommandSender;

public PowerplantRestController(PowerPlantCommandSender powerPlantCommandSender) {
this.powerPlantCommandSender = powerPlantCommandSender;
}

@PutMapping("/start")
private ResponseEntity<String> startPowerplant(@PathVariable String id) {
log.info("Request to start powerplant with id {}", id);
powerPlantCommandSender.sendCommand(id, PowerplantCommand.start);
return ResponseEntity.ok("Powerplant started");
}

@PutMapping("/stop")
private ResponseEntity<String> stopPowerplant(@PathVariable String id) {
log.info("Request to stop powerplant with id {}", id);
powerPlantCommandSender.sendCommand(id, PowerplantCommand.stop);
return ResponseEntity.ok("Powerplant stopped");
}

@PutMapping("/gateClosure")
private ResponseEntity<String> changeGateClosure(@PathVariable String id, @RequestBody GateClosure gateClosure) {
log.info("Request to change gate closure to {} which is {} on powerplant with id {}", gateClosure.closure(), gateClosure.validate() ? "valid" : "not valid" , id);
if(gateClosure.validate()) {
powerPlantCommandSender.sendCommand(id, gateClosure.getEquivalentPowerplantCommand());
return ResponseEntity.ok("Changed gate closure");
} else {
return ResponseEntity.badRequest().body("Invalid gate closure");
}

}
}
3 changes: 2 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ spring.application.name=controller-service
kafka.serverAddress=${KAFKA_BROKER}
kafka.serverPort=${KAFKA_BROKER_PORT}
kafka.groupId=${KAFKA_GROUP_ID}
kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
kafka.powerplant.topic=${KAFKA_POWERPLANT_TOPIC}
kafka.controlTopic=${KAFKA_POWERPLANT_CONTROL_TOPIC}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package kilowattcommando.controllerservice;

import dto.DTOJsonDeserializer;
import dto.PowerplantCommand;
import dto.PowerplantOperation;
import kilowattcommando.controllerservice.kafka.KafkaContainerExtension;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.MockMvc;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest
@TestPropertySource(
properties = {
"spring.application.name=controller-service"
})
@ExtendWith(KafkaContainerExtension.class)
@DirtiesContext
@AutoConfigureMockMvc
public class CommandIntegrationTest {


@Autowired
private MockMvc mockMvc;

@Value("kafka.groupId")
private String groupId;

private KafkaConsumer<String, PowerplantOperation> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}

@Test
void startPowerPlant_1() throws Exception {
mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
try(KafkaConsumer<String, PowerplantOperation> consumer = createConsumer()) {
consumer.subscribe(List.of("powerplant"));

await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, PowerplantOperation> records = consumer.poll(Duration.ofSeconds(2));

assertEquals(1, records.count());
ConsumerRecord<String, PowerplantOperation> record = records.iterator().next();
assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
assertEquals(PowerplantCommand.start, record.value().command);
}
);

}
}

@Test
void startPowerPlant_1_and_setGateToHalf() throws Exception {
mockMvc.perform(put("/powerplant/1/start")).andExpect(status().isOk());
mockMvc.perform(put("/powerplant/1/gateClosure").contentType(MediaType.APPLICATION_JSON).content("{\"closure\": \"HALF\"}")).andExpect(status().isOk());
try(KafkaConsumer<String, PowerplantOperation> consumer = createConsumer()) {
consumer.subscribe(List.of("powerplant"));

await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, PowerplantOperation> records = consumer.poll(Duration.ofSeconds(2));

assertEquals(2, records.count());

Iterator<ConsumerRecord<String, PowerplantOperation>> iterator = records.iterator();
ConsumerRecord<String, PowerplantOperation> record = iterator.next();
ConsumerRecord<String, PowerplantOperation> record2 = iterator.next();

assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
assertEquals(PowerplantCommand.start, record.value().command);

assertEquals("1", new String(record2.headers().lastHeader("targetConsumerId").value()));
assertEquals(PowerplantCommand.gateHalf, record2.value().command);
}
);

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kilowattcommando.controllerservice.kafka;

import dto.DTOJsonDeserializer;
import dto.PowerplantCommand;
import dto.PowerplantOperation;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest
@TestPropertySource(
properties = {
"spring.application.name=controller-service"
})
@DirtiesContext
@ExtendWith(KafkaContainerExtension.class)
public class CommandSenderTest {

@Value("kafka.groupId")
private String groupId;

@Autowired
private PowerPlantCommandSender powerPlantCommandSender;

private KafkaConsumer<String, PowerplantOperation> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTOJsonDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}

@Test
void startPowerPlant_1() throws Exception {
powerPlantCommandSender.sendCommand("1", PowerplantCommand.start);
try(KafkaConsumer<String, PowerplantOperation> consumer = createConsumer()) {
consumer.subscribe(List.of("powerplant"));

await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, PowerplantOperation> records = consumer.poll(Duration.ofSeconds(2));

assertEquals(1, records.count());
ConsumerRecord<String, PowerplantOperation> record = records.iterator().next();
assertEquals("1", new String(record.headers().lastHeader("targetConsumerId").value()));
assertEquals(PowerplantCommand.start, record.value().command);
}
);

}
}

}
Loading
Loading