Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ object Dependencies {

// Spring Cloud Config
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config"

// Kafka
const val KAFKA = "org.springframework.kafka:spring-kafka"
}
11 changes: 9 additions & 2 deletions casper-status/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ dependencies {
implementation(Dependencies.RESILIENCE4J_SPRING_BOOT)

// Spring Cloud Config
//implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)
// implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)

// Kafka
implementation(Dependencies.KAFKA)
}

protobuf {
Expand Down Expand Up @@ -125,4 +128,8 @@ tasks.withType<KotlinCompile> {

tasks.withType<Test> {
useJUnitPlatform()
}
}

tasks.matching { it.name.startsWith("ktlint") }.configureEach {
enabled = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.springframework.security.web.SecurityFilterChain
*/
@Configuration
class SecurityConfig(
private val objectMapper: ObjectMapper
private val objectMapper: ObjectMapper,
) {
companion object {
const val ADMIN_ROLE = "ADMIN"
Expand All @@ -38,7 +38,6 @@ class SecurityConfig(
.sessionManagement {
it.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
}

.authorizeHttpRequests {
it
.requestMatchers("/").permitAll()
Expand All @@ -49,6 +48,5 @@ class SecurityConfig(
.with(FilterConfig(objectMapper)) { }

return http.build()

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package hs.kr.entrydsm.status.infrastructure.kafka.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer

/**
* 카프카 Consumer 관련 설정
*/
@EnableKafka
@Configuration
class KafkaConsumerConfig(
private val kafkaProperty: KafkaProperty,
) {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

factory.setConcurrency(2)
factory.consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig())
factory.containerProperties.pollTimeout = 500
return factory
}

private fun consumerFactoryConfig(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000,
JsonDeserializer.TRUSTED_PACKAGES to "*",
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";",
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package hs.kr.entrydsm.status.infrastructure.kafka.config

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer

/**
* 카프카 Producer 관련 설정
*/
@Configuration
class KafkaProducerConfig(
private val kafkaProperty: KafkaProperty,
) {
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory())
}

private fun producerFactory(): ProducerFactory<String, Any> {
return DefaultKafkaProducerFactory(producerConfig())
}

private fun producerConfig(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";",
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package hs.kr.entrydsm.status.infrastructure.kafka.config

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConfigurationPropertiesBinding

/**
* 카프카 서버 관련 설정
*/
@ConfigurationPropertiesBinding
@ConfigurationProperties("kafka")
class KafkaProperty(
val serverAddress: String,
val confluentApiKey: String,
val confluentApiSecret: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package hs.kr.entrydsm.status.infrastructure.kafka.config

/**
* 카프카 토픽을 정의하는 객체
*/
object KafkaTopics {
/**
* 원서 생성 토픽
*/
const val CREATE_APPLICATION = "create-application"

/**
* 유저 탈퇴 토픽
*/
const val DELETE_USER = "delete-user"

/**
* 최종 제출 토픽
*/
const val SUBMIT_APPLICATION_FINAL = "submit-application-final"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package hs.kr.entrydsm.status.infrastructure.kafka.consumer

import com.fasterxml.jackson.databind.ObjectMapper
import hs.kr.entrydsm.status.domain.status.application.port.`in`.CreateStatusUseCase
import hs.kr.entrydsm.status.domain.status.application.port.`in`.DeleteStatusUseCase
import hs.kr.entrydsm.status.domain.status.application.port.`in`.UpdateStatusUseCase
import hs.kr.entrydsm.status.infrastructure.kafka.config.KafkaTopics
import hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto.CreateApplicationEvent
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

/**
* 입학 원서 상태 관련 메시지를 수신하는 Consumer
*/
@Component
class StatusConsumer(
private val mapper: ObjectMapper,
private val createStatusUseCase: CreateStatusUseCase,
private val updateStatusUseCase: UpdateStatusUseCase,
private val deleteStatusUseCase: DeleteStatusUseCase,
) {
/**
* 원서가 생성되면, 원서 상태를 생성합니다.
*
* @param message 원서 생성 이벤트
*/
@KafkaListener(
topics = [KafkaTopics.CREATE_APPLICATION],
groupId = "create-status",
containerFactory = "kafkaListenerContainerFactory",
)
fun createStatus(message: String) {
val createApplicationEvent = mapper.readValue(message, CreateApplicationEvent::class.java)
createStatusUseCase.execute(createApplicationEvent.receiptCode)
}

/**
* 최종 제출된 원서의 상태를 변경합니다.
*
* @param message 최종 제출된 원서의 접수 번호
*/
@KafkaListener(
topics = [KafkaTopics.SUBMIT_APPLICATION_FINAL],
groupId = "update-status",
containerFactory = "kafkaListenerContainerFactory",
)
fun updateStatus(message: String) {
val receiptCode = mapper.readValue(message, Long::class.java)
updateStatusUseCase.execute(receiptCode)
}

/**
* 탈퇴한 유저의 원서 상태를 삭제합니다.
*
* @param message 탈퇴한 유저의 접수 번호
*/
@KafkaListener(
topics = [KafkaTopics.DELETE_USER],
groupId = "delete-status",
containerFactory = "kafkaListenerContainerFactory",
)
fun deleteStatus(message: String) {
val receiptCode = mapper.readValue(message, Long::class.java)
deleteStatusUseCase.execute(receiptCode)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto

import java.util.UUID

data class CreateApplicationEvent(
val receiptCode: Long,
val userId: UUID,
)
Loading