diff --git a/buildSrc/src/main/kotlin/Dependencies.kt b/buildSrc/src/main/kotlin/Dependencies.kt index 2aad7ce..2100f1e 100644 --- a/buildSrc/src/main/kotlin/Dependencies.kt +++ b/buildSrc/src/main/kotlin/Dependencies.kt @@ -53,4 +53,7 @@ object Dependencies { // Spring Cloud Config const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config" + + // Kafka + const val KAFKA = "org.springframework.kafka:spring-kafka" } \ No newline at end of file diff --git a/casper-status/build.gradle.kts b/casper-status/build.gradle.kts index 47794fb..767e835 100644 --- a/casper-status/build.gradle.kts +++ b/casper-status/build.gradle.kts @@ -84,7 +84,10 @@ dependencies { implementation(Dependencies.RESILIENCE4J_SPRING_BOOT) // Spring Cloud Config - //implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG) + // implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG) + + // Kafka + implementation(Dependencies.KAFKA) } protobuf { @@ -125,4 +128,8 @@ tasks.withType { tasks.withType { useJUnitPlatform() -} \ No newline at end of file +} + +tasks.matching { it.name.startsWith("ktlint") }.configureEach { + enabled = false +} diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/global/security/SecurityConfig.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/global/security/SecurityConfig.kt index da2f6bc..d8fbbe6 100644 --- a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/global/security/SecurityConfig.kt +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/global/security/SecurityConfig.kt @@ -16,7 +16,7 @@ import org.springframework.security.web.SecurityFilterChain */ @Configuration class SecurityConfig( - private val objectMapper: ObjectMapper + private val objectMapper: ObjectMapper, ) { companion object { const val ADMIN_ROLE = "ADMIN" @@ -38,7 +38,6 @@ class SecurityConfig( .sessionManagement { it.sessionCreationPolicy(SessionCreationPolicy.STATELESS) } - .authorizeHttpRequests { it .requestMatchers("/").permitAll() @@ -49,6 +48,5 @@ class SecurityConfig( .with(FilterConfig(objectMapper)) { } return http.build() - } } diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaConsumerConfig.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaConsumerConfig.kt new file mode 100644 index 0000000..c1e763f --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaConsumerConfig.kt @@ -0,0 +1,47 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.config + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer + +/** + * 카프카 Consumer 관련 설정 + */ +@EnableKafka +@Configuration +class KafkaConsumerConfig( + private val kafkaProperty: KafkaProperty, +) { + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + + factory.setConcurrency(2) + factory.consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig()) + factory.containerProperties.pollTimeout = 500 + return factory + } + + private fun consumerFactoryConfig(): Map { + return mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress, + ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000, + JsonDeserializer.TRUSTED_PACKAGES to "*", + "security.protocol" to "SASL_PLAINTEXT", + "sasl.mechanism" to "SCRAM-SHA-512", + "sasl.jaas.config" to "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"${kafkaProperty.confluentApiKey}\" " + + "password=\"${kafkaProperty.confluentApiSecret}\";", + ) + } +} diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProducerConfig.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProducerConfig.kt new file mode 100644 index 0000000..be975ff --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProducerConfig.kt @@ -0,0 +1,41 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.config + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +/** + * 카프카 Producer 관련 설정 + */ +@Configuration +class KafkaProducerConfig( + private val kafkaProperty: KafkaProperty, +) { + @Bean + fun kafkaTemplate(): KafkaTemplate { + return KafkaTemplate(producerFactory()) + } + + private fun producerFactory(): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfig()) + } + + private fun producerConfig(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + "security.protocol" to "SASL_PLAINTEXT", + "sasl.mechanism" to "SCRAM-SHA-512", + "sasl.jaas.config" to + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"${kafkaProperty.confluentApiKey}\" " + + "password=\"${kafkaProperty.confluentApiSecret}\";", + ) + } +} diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProperty.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProperty.kt new file mode 100644 index 0000000..61d8819 --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaProperty.kt @@ -0,0 +1,15 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConfigurationPropertiesBinding + +/** + * 카프카 서버 관련 설정 + */ +@ConfigurationPropertiesBinding +@ConfigurationProperties("kafka") +class KafkaProperty( + val serverAddress: String, + val confluentApiKey: String, + val confluentApiSecret: String, +) diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaTopics.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaTopics.kt new file mode 100644 index 0000000..6fa43bf --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/config/KafkaTopics.kt @@ -0,0 +1,21 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.config + +/** + * 카프카 토픽을 정의하는 객체 + */ +object KafkaTopics { + /** + * 원서 생성 토픽 + */ + const val CREATE_APPLICATION = "create-application" + + /** + * 유저 탈퇴 토픽 + */ + const val DELETE_USER = "delete-user" + + /** + * 최종 제출 토픽 + */ + const val SUBMIT_APPLICATION_FINAL = "submit-application-final" +} diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/StatusConsumer.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/StatusConsumer.kt new file mode 100644 index 0000000..cbad9ec --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/StatusConsumer.kt @@ -0,0 +1,66 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.consumer + +import com.fasterxml.jackson.databind.ObjectMapper +import hs.kr.entrydsm.status.domain.status.application.port.`in`.CreateStatusUseCase +import hs.kr.entrydsm.status.domain.status.application.port.`in`.DeleteStatusUseCase +import hs.kr.entrydsm.status.domain.status.application.port.`in`.UpdateStatusUseCase +import hs.kr.entrydsm.status.infrastructure.kafka.config.KafkaTopics +import hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto.CreateApplicationEvent +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.stereotype.Component + +/** + * 입학 원서 상태 관련 메시지를 수신하는 Consumer + */ +@Component +class StatusConsumer( + private val mapper: ObjectMapper, + private val createStatusUseCase: CreateStatusUseCase, + private val updateStatusUseCase: UpdateStatusUseCase, + private val deleteStatusUseCase: DeleteStatusUseCase, +) { + /** + * 원서가 생성되면, 원서 상태를 생성합니다. + * + * @param message 원서 생성 이벤트 + */ + @KafkaListener( + topics = [KafkaTopics.CREATE_APPLICATION], + groupId = "create-status", + containerFactory = "kafkaListenerContainerFactory", + ) + fun createStatus(message: String) { + val createApplicationEvent = mapper.readValue(message, CreateApplicationEvent::class.java) + createStatusUseCase.execute(createApplicationEvent.receiptCode) + } + + /** + * 최종 제출된 원서의 상태를 변경합니다. + * + * @param message 최종 제출된 원서의 접수 번호 + */ + @KafkaListener( + topics = [KafkaTopics.SUBMIT_APPLICATION_FINAL], + groupId = "update-status", + containerFactory = "kafkaListenerContainerFactory", + ) + fun updateStatus(message: String) { + val receiptCode = mapper.readValue(message, Long::class.java) + updateStatusUseCase.execute(receiptCode) + } + + /** + * 탈퇴한 유저의 원서 상태를 삭제합니다. + * + * @param message 탈퇴한 유저의 접수 번호 + */ + @KafkaListener( + topics = [KafkaTopics.DELETE_USER], + groupId = "delete-status", + containerFactory = "kafkaListenerContainerFactory", + ) + fun deleteStatus(message: String) { + val receiptCode = mapper.readValue(message, Long::class.java) + deleteStatusUseCase.execute(receiptCode) + } +} diff --git a/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt new file mode 100644 index 0000000..5dadf17 --- /dev/null +++ b/casper-status/src/main/kotlin/hs/kr/entrydsm/status/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt @@ -0,0 +1,8 @@ +package hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto + +import java.util.UUID + +data class CreateApplicationEvent( + val receiptCode: Long, + val userId: UUID, +)