diff --git a/buildSrc/src/main/kotlin/Dependencies.kt b/buildSrc/src/main/kotlin/Dependencies.kt index 93c250f..c3e4558 100644 --- a/buildSrc/src/main/kotlin/Dependencies.kt +++ b/buildSrc/src/main/kotlin/Dependencies.kt @@ -50,5 +50,7 @@ object Dependencies { const val SENTRY_SPRING_BOOT_STARTER = "io.sentry:sentry-spring-boot-starter-jakarta:${DependencyVersion.SENTRY}" // Spring Cloud Config - const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config" + const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter:2024.0.2" + + const val KAFKA = "org.springframework.kafka:spring-kafka" } \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/Plugin.kt b/buildSrc/src/main/kotlin/Plugin.kt index eb910e9..ed4563e 100644 --- a/buildSrc/src/main/kotlin/Plugin.kt +++ b/buildSrc/src/main/kotlin/Plugin.kt @@ -1,6 +1,7 @@ object Plugin { const val KOTLIN_JVM = "org.jetbrains.kotlin.jvm" const val KOTLIN_SPRING = "org.jetbrains.kotlin.plugin.spring" + const val KOTLIN_JPA = "org.jetbrains.kotlin.plugin.jpa" const val KOTLIN_KAPT = "org.jetbrains.kotlin.kapt" const val SPRING_BOOT = "org.springframework.boot" const val SPRING_DEPENDENCY_MANAGEMENT = "io.spring.dependency-management" diff --git a/casper-user/build.gradle.kts b/casper-user/build.gradle.kts index 31bb7ed..cf8e149 100644 --- a/casper-user/build.gradle.kts +++ b/casper-user/build.gradle.kts @@ -4,6 +4,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { id(Plugin.KOTLIN_JVM) version PluginVersion.KOTLIN_VERSION id(Plugin.KOTLIN_SPRING) version PluginVersion.KOTLIN_VERSION + id(Plugin.KOTLIN_JPA) version PluginVersion.KOTLIN_VERSION id(Plugin.KOTLIN_KAPT) id(Plugin.SPRING_BOOT) version PluginVersion.SPRING_BOOT_VERSION id(Plugin.SPRING_DEPENDENCY_MANAGEMENT) version PluginVersion.SPRING_DEPENDENCY_MANAGEMENT_VERSION @@ -78,8 +79,11 @@ dependencies { // Sentry implementation(Dependencies.SENTRY_SPRING_BOOT_STARTER) + //kafka + implementation(Dependencies.KAFKA) + // Spring Cloud Config - implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG) + //implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG) } protobuf { diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/adapter/out/persistence/UserPersistenceAdapter.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/adapter/out/persistence/UserPersistenceAdapter.kt index c3f4f06..7fb4cf7 100644 --- a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/adapter/out/persistence/UserPersistenceAdapter.kt +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/adapter/out/persistence/UserPersistenceAdapter.kt @@ -81,6 +81,14 @@ class UserPersistenceAdapter( userRepository.deleteById(userId) } + /** + * 모든 사용자를 삭제합니다. + * 관리자의 전체 데이터 초기화 시에만 사용됩니다. + */ + override fun deleteAll() { + userRepository.deleteAll() + } + /** * 지정된 일수보다 오래된 탈퇴 사용자를 조회합니다. * diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/port/out/DeleteUserPort.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/port/out/DeleteUserPort.kt index 15cf42c..882fa19 100644 --- a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/port/out/DeleteUserPort.kt +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/port/out/DeleteUserPort.kt @@ -22,4 +22,10 @@ interface DeleteUserPort { * @return 삭제 대상 사용자 목록 */ fun findWithdrawnUsersOlderThan(days: Long): List + + /** + * 모든 사용자를 삭제합니다. + * 관리자의 전체 데이터 초기화 시에만 사용됩니다. + */ + fun deleteAll() } diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/service/ChangeReceiptCodeService.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/service/ChangeReceiptCodeService.kt index 26e17b2..0f102a5 100644 --- a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/service/ChangeReceiptCodeService.kt +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/service/ChangeReceiptCodeService.kt @@ -4,8 +4,12 @@ import hs.kr.entrydsm.user.domain.user.application.port.`in`.ChangeReceiptCodeUs import hs.kr.entrydsm.user.domain.user.application.port.out.QueryUserPort import hs.kr.entrydsm.user.domain.user.application.port.out.SaveUserPort import hs.kr.entrydsm.user.domain.user.exception.UserNotFoundException +import hs.kr.entrydsm.user.infrastructure.kafka.producer.UserEventProducer +import jakarta.transaction.Synchronization import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional +import org.springframework.transaction.support.TransactionSynchronization +import org.springframework.transaction.support.TransactionSynchronizationManager import java.util.UUID /** @@ -14,12 +18,13 @@ import java.util.UUID * * @property queryUserPort 사용자 조회 포트 * @property saveUserPort 사용자 저장 포트 + * @property userEventProducer 사용자 이벤트 발행기 */ -@Transactional @Service class ChangeReceiptCodeService( private val queryUserPort: QueryUserPort, private val saveUserPort: SaveUserPort, + private val userEventProducer: UserEventProducer ) : ChangeReceiptCodeUseCase { /** * 사용자의 접수코드를 변경합니다. @@ -28,12 +33,43 @@ class ChangeReceiptCodeService( * @param receiptCode 새로운 접수코드 * @throws UserNotFoundException 사용자가 존재하지 않는 경우 */ + @Transactional override fun changeReceiptCode( userId: UUID, receiptCode: Long, ) { - val user = queryUserPort.findById(userId) ?: throw UserNotFoundException - val updateUser = user.changeReceiptCode(receiptCode) - saveUserPort.save(updateUser) + try { + val user = queryUserPort.findById(userId) + + if (user == null) { + userEventProducer.sendReceiptCodeUpdateFailed( + receiptCode = receiptCode, + userId = userId, + reason = "User not found" + ) + throw UserNotFoundException + } + + val updatedUser = user.copy(receiptCode = receiptCode) + saveUserPort.save(updatedUser) + + TransactionSynchronizationManager.registerSynchronization(object : TransactionSynchronization { + override fun afterCommit() { + userEventProducer.sendReceiptCodeUpdateCompleted(receiptCode, userId) + } + }) + + + } catch (e: Exception) { + + if (e !is UserNotFoundException) { + userEventProducer.sendReceiptCodeUpdateFailed( + receiptCode = receiptCode, + userId = userId, + reason = e.message ?: "Unknown error" + ) + } + throw e // 예외 다시 던져서 롤백 발생 + } } } diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt new file mode 100644 index 0000000..db0dc8f --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt @@ -0,0 +1,69 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.configuration + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer + +/** + * Kafka Consumer 설정을 담당하는 Configuration 클래스입니다. + * + * 원서 생성 이벤트 수신을 위한 Consumer 설정을 구성하며, + * Confluent Cloud 연결을 위한 보안 설정을 포함합니다. + * + * @property kafkaProperty Kafka 연결 정보를 담은 프로퍼티 + */ +@EnableKafka +@Configuration +class KafkaConsumerConfig( + private val kafkaProperty: KafkaProperty +) { + + /** + * Kafka 리스너 컨테이너 팩토리를 생성합니다. + * + * 동시성 레벨을 2로 설정하여 병렬 메시지 처리를 지원하며, + * 폴링 타임아웃을 500ms로 설정하여 적절한 응답성을 보장합니다. + * + * @return 설정된 ConcurrentKafkaListenerContainerFactory 인스턴스 + */ + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + return ConcurrentKafkaListenerContainerFactory().apply { + setConcurrency(2) + consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig()) + containerProperties.pollTimeout = 500 + } + } + + /** + * Kafka Consumer의 기본 설정을 구성합니다. + * + * Confluent Cloud 연결을 위한 SASL 보안 설정과 역직렬화 설정을 포함하며, + * read_committed 격리 레벨로 트랜잭션 안정성을 보장합니다. + * + * @return Consumer 설정 맵 + */ + private fun consumerFactoryConfig(): Map { + return mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress, + ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000, + JsonDeserializer.TRUSTED_PACKAGES to "*", + "security.protocol" to "SASL_PLAINTEXT", + "sasl.mechanism" to "SCRAM-SHA-512", + "sasl.jaas.config" to + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"${kafkaProperty.confluentApiKey}\" " + + "password=\"${kafkaProperty.confluentApiSecret}\";" + ) + } +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt new file mode 100644 index 0000000..1759cb3 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt @@ -0,0 +1,105 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.configuration + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer + +/** + * Kafka Producer 설정을 담당하는 Configuration 클래스입니다. + * + * 사용자 삭제, 전체 테이블 삭제, 사용자 이벤트 발행을 위한 + * 각각의 KafkaTemplate과 ProducerFactory를 구성하며, + * Confluent Cloud 연결을 위한 보안 설정을 포함합니다. + * + * @property kafkaProperty Kafka 연결 정보를 담은 프로퍼티 + */ +@Configuration +class KafkaProducerConfig( + private val kafkaProperty: KafkaProperty +) { + + /** + * 전체 테이블 삭제 이벤트용 Producer Factory를 생성합니다. + * + * @return Unit 타입 메시지용 DefaultKafkaProducerFactory + */ + @Bean + fun deleteAllTableProducerFactory(): DefaultKafkaProducerFactory { + return DefaultKafkaProducerFactory(producerConfig()) + } + + /** + * 전체 테이블 삭제 이벤트 발행을 위한 KafkaTemplate을 생성합니다. + * + * @return Unit 타입 메시지용 KafkaTemplate + */ + @Bean + fun deleteAllTableKafkaTemplate(): KafkaTemplate { + return KafkaTemplate(deleteAllTableProducerFactory()) + } + + /** + * 사용자 삭제 이벤트용 Producer Factory를 생성합니다. + * + * @return Long 타입 메시지용 DefaultKafkaProducerFactory + */ + @Bean + fun deleteUserProducerFactory(): DefaultKafkaProducerFactory { + return DefaultKafkaProducerFactory(producerConfig()) + } + + /** + * 사용자 삭제 이벤트 발행을 위한 KafkaTemplate을 생성합니다. + * + * @return Long 타입 메시지용 KafkaTemplate + */ + @Bean + fun deleteUserKafkaTemplate(): KafkaTemplate { + return KafkaTemplate(deleteUserProducerFactory()) + } + + /** + * 사용자 이벤트용 Producer Factory를 생성합니다. + * + * @return Any 타입 메시지용 DefaultKafkaProducerFactory + */ + @Bean + fun userEventProducerFactory(): DefaultKafkaProducerFactory { + return DefaultKafkaProducerFactory(producerConfig()) + } + + /** + * 사용자 이벤트 발행을 위한 KafkaTemplate을 생성합니다. + * + * @return Any 타입 메시지용 KafkaTemplate + */ + @Bean + fun userEventKafkaTemplate(): KafkaTemplate { + return KafkaTemplate(userEventProducerFactory()) + } + + /** + * Kafka Producer의 기본 설정을 구성합니다. + * + * Confluent Cloud 연결을 위한 SASL 보안 설정과 직렬화 설정을 포함합니다. + * + * @return Producer 설정 맵 + */ + private fun producerConfig(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + "security.protocol" to "SASL_PLAINTEXT", + "sasl.mechanism" to "SCRAM-SHA-512", + "sasl.jaas.config" to + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"${kafkaProperty.confluentApiKey}\" " + + "password=\"${kafkaProperty.confluentApiSecret}\";" + ) + } +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProperty.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProperty.kt new file mode 100644 index 0000000..4f33502 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProperty.kt @@ -0,0 +1,22 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.configuration + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConfigurationPropertiesBinding + +/** + * Kafka 연결을 위한 설정 프로퍼티 클래스입니다. + * + * application.yml의 kafka 섹션에서 설정값을 바인딩하여 + * Confluent Cloud Kafka 클러스터 연결에 필요한 정보를 관리합니다. + * + * @property serverAddress Kafka 브로커 서버 주소 + * @property confluentApiKey Confluent Cloud 접근을 위한 API 키 + * @property confluentApiSecret Confluent Cloud 접근을 위한 API 시크릿 + */ +@ConfigurationPropertiesBinding +@ConfigurationProperties("kafka") +class KafkaProperty( + val serverAddress: String, + val confluentApiKey: String, + val confluentApiSecret: String +) diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaTopics.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaTopics.kt new file mode 100644 index 0000000..1e138a7 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaTopics.kt @@ -0,0 +1,42 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.configuration + +/** + * Kafka 토픽명을 관리하는 상수 객체입니다. + * + * 마이크로서비스 간 이벤트 통신에 사용되는 토픽명들을 중앙에서 관리하여 + * 토픽명 변경 시 일관성을 보장합니다. + */ +object KafkaTopics { + /** + * 사용자 삭제 이벤트 토픽 + * 사용자가 탈퇴했을 때 다른 서비스에 알리기 위해 사용 + */ + const val DELETE_USER = "delete-user" + + /** + * 전체 테이블 삭제 이벤트 토픽 + * 관리자가 전체 데이터를 초기화할 때 다른 서비스에 알리기 위해 사용 + */ + const val DELETE_ALL_TABLE = "delete-all-table" + + /** + * 원서 생성 이벤트 토픽 + * 원서 서비스에서 원서가 생성되었을 때 사용자 접수번호 업데이트를 위해 사용 + */ + const val CREATE_APPLICATION = "create-application" + + // Choreography 이벤트들 + + /** + * 사용자 접수번호 업데이트 완료 이벤트 토픽 + * 사용자 서비스에서 접수번호 업데이트가 성공적으로 완료되었음을 알리는 이벤트 + */ + const val USER_RECEIPT_CODE_UPDATE_COMPLETED = "user-receipt-code-update-completed" + + /** + * 사용자 접수번호 업데이트 실패 이벤트 토픽 + * 사용자 서비스에서 접수번호 업데이트가 실패했음을 알리는 이벤트 + * 원서 서비스에서 이 이벤트를 수신하면 보상 트랜잭션을 수행함 + */ + const val USER_RECEIPT_CODE_UPDATE_FAILED = "user-receipt-code-update-failed" +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/CreateApplicationConsumer.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/CreateApplicationConsumer.kt new file mode 100644 index 0000000..f88d0d3 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/CreateApplicationConsumer.kt @@ -0,0 +1,25 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.consumer + +import com.fasterxml.jackson.databind.ObjectMapper +import hs.kr.entrydsm.user.domain.user.application.port.`in`.ChangeReceiptCodeUseCase +import hs.kr.entrydsm.user.infrastructure.kafka.configuration.KafkaTopics +import hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto.CreateApplicationEvent +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.stereotype.Service + +@Service +class CreateApplicationConsumer( + private val changeReceiptCodeUseCase: ChangeReceiptCodeUseCase, + private val mapper: ObjectMapper +) { + @KafkaListener( + topics = [KafkaTopics.CREATE_APPLICATION], + groupId = "change-user-receipt-code-consumer", + containerFactory = "kafkaListenerContainerFactory" + ) + fun execute(message: String) { + val createApplicationEvent = mapper.readValue(message, CreateApplicationEvent::class.java) + changeReceiptCodeUseCase.changeReceiptCode(createApplicationEvent.userId, createApplicationEvent.receiptCode) + } + +} \ No newline at end of file diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/DeleteUserTableConsumer.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/DeleteUserTableConsumer.kt new file mode 100644 index 0000000..a0ca83b --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/DeleteUserTableConsumer.kt @@ -0,0 +1,34 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.consumer + +import hs.kr.entrydsm.user.domain.user.application.port.out.DeleteUserPort +import hs.kr.entrydsm.user.infrastructure.kafka.configuration.KafkaTopics +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.transaction.annotation.Transactional + +/** + * 전체 테이블 삭제 이벤트를 수신하여 사용자 테이블을 삭제하는 Consumer 클래스입니다. + * + * 관리자가 전체 데이터 초기화를 요청했을 때 사용자 테이블의 모든 데이터를 + * 삭제하는 역할을 담당합니다. + * + * @property deleteUserPort 사용자 삭제 작업을 수행하는 포트 + */ +open class DeleteUserTableConsumer( + private val deleteUserPort: DeleteUserPort +) { + + /** + * 전체 테이블 삭제 이벤트를 수신하여 모든 사용자 데이터를 삭제합니다. + * + * DELETE_ALL_TABLE 토픽에서 이벤트를 수신하고, + * 트랜잭션 내에서 모든 사용자 데이터를 삭제합니다. + */ + @KafkaListener( + topics = [KafkaTopics.DELETE_ALL_TABLE], + groupId = "delete-all-table-user", + containerFactory = "kafkaListenerContainerFactory" + ) + @Transactional + open fun execute() = deleteUserPort.deleteAll() + +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt new file mode 100644 index 0000000..2e756f5 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/CreateApplicationEvent.kt @@ -0,0 +1,17 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto + +import java.util.UUID + +/** + * 원서 생성 이벤트 데이터를 담는 DTO 클래스입니다. + * + * 원서 서비스에서 원서가 생성되었을 때 사용자 서비스에 전달되는 + * 이벤트 데이터를 정의합니다. 사용자의 접수번호 업데이트를 위해 사용됩니다. + * + * @property receiptCode 생성된 원서의 접수번호 + * @property userId 원서를 생성한 사용자의 고유 식별자 + */ +data class CreateApplicationEvent( + val receiptCode: Long, + val userId: UUID +) diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateCompletedEvent.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateCompletedEvent.kt new file mode 100644 index 0000000..f4d37e0 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateCompletedEvent.kt @@ -0,0 +1,17 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto + +import java.util.UUID + +/** + * 사용자 접수번호 업데이트 완료 이벤트 데이터를 담는 DTO 클래스입니다. + * + * 사용자 서비스에서 접수번호 업데이트가 성공적으로 완료되었을 때 + * 원서 서비스에 전달하는 이벤트 데이터를 정의합니다. + * + * @property receiptCode 업데이트된 접수번호 + * @property userId 접수번호가 업데이트된 사용자의 고유 식별자 + */ +data class UserReceiptCodeUpdateCompletedEvent( + val receiptCode: Long, + val userId: UUID +) diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateFailedEvent.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateFailedEvent.kt new file mode 100644 index 0000000..052a84f --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/consumer/dto/UserReceiptCodeUpdateFailedEvent.kt @@ -0,0 +1,20 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto + +import java.util.UUID + +/** + * 사용자 접수번호 업데이트 실패 이벤트 데이터를 담는 DTO 클래스입니다. + * + * 사용자 서비스에서 접수번호 업데이트가 실패했을 때 원서 서비스에 전달하는 + * 이벤트 데이터를 정의합니다. 원서 서비스에서는 이 이벤트를 수신하여 + * 보상 트랜잭션으로 해당 원서를 삭제합니다. + * + * @property receiptCode 업데이트 실패한 접수번호 + * @property userId 접수번호 업데이트가 실패한 사용자의 고유 식별자 + * @property reason 업데이트 실패 사유 + */ +data class UserReceiptCodeUpdateFailedEvent( + val receiptCode: Long, + val userId: UUID, + val reason: String +) diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteAllTableProducerImpl.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteAllTableProducerImpl.kt new file mode 100644 index 0000000..1c9f002 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteAllTableProducerImpl.kt @@ -0,0 +1,30 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.producer + +import hs.kr.entrydsm.user.infrastructure.kafka.configuration.KafkaTopics +import org.springframework.context.annotation.Profile +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component + +/** + * 전체 테이블 삭제 이벤트를 발행하는 Producer 구현체입니다. + * + * 관리자가 전체 데이터를 초기화할 때 다른 마이크로서비스에 알려 + * 모든 관련 데이터를 함께 삭제할 수 있도록 합니다. + * + * @property kafkaTemplate 전체 테이블 삭제 이벤트 발행용 KafkaTemplate + */ +@Component +class DeleteAllTableProducerImpl( + private val kafkaTemplate: KafkaTemplate, +) : DeleteAllTableProducer { + + /** + * 전체 테이블 삭제 이벤트를 Kafka로 발행합니다. + * + * DELETE_ALL_TABLE 토픽에 Unit 값을 전송하여 다른 서비스에서 + * 전체 데이터 삭제를 수행하도록 합니다. + */ + override fun send() { + kafkaTemplate.send(KafkaTopics.DELETE_ALL_TABLE, Unit) + } +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteUserProducerImpl.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteUserProducerImpl.kt new file mode 100644 index 0000000..19a8686 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/DeleteUserProducerImpl.kt @@ -0,0 +1,31 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.producer + +import hs.kr.entrydsm.user.infrastructure.kafka.configuration.KafkaTopics +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component + +/** + * 사용자 삭제 이벤트를 발행하는 Producer 구현체입니다. + * + * 사용자 탈퇴 시 해당 사용자의 접수번호를 다른 마이크로서비스에 알려 + * 연관된 데이터를 함께 삭제할 수 있도록 합니다. + * + * @property kafkaTemplate 사용자 삭제 이벤트 발행용 KafkaTemplate + */ +@Component +class DeleteUserProducerImpl( + private val kafkaTemplate: KafkaTemplate +) : DeleteUserProducer { + + /** + * 사용자 삭제 이벤트를 Kafka로 발행합니다. + * + * DELETE_USER 토픽에 접수번호를 전송하여 다른 서비스에서 + * 해당 접수번호와 연관된 데이터를 삭제하도록 합니다. + * + * @param receiptCode 삭제된 사용자의 접수번호 + */ + override fun send(receiptCode: Long) { + kafkaTemplate.send(KafkaTopics.DELETE_USER, receiptCode) + } +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteAllTableProducer.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteAllTableProducer.kt deleted file mode 100644 index 0ebc1b9..0000000 --- a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteAllTableProducer.kt +++ /dev/null @@ -1,16 +0,0 @@ -package hs.kr.entrydsm.user.infrastructure.kafka.producer - -import org.springframework.context.annotation.Profile -import org.springframework.stereotype.Component - -/** - * Kafka가 도입되기 전까지 사용할 임시 Mock Producer입니다. - */ -@Component -@Profile("!kafka") -class MockDeleteAllTableProducer : DeleteAllTableProducer { - override fun send() { - // TODO: Kafka 도입 후 실제 구현체로 교체 - println("Mock: DeleteAllTable event sent (Kafka not available)") - } -} \ No newline at end of file diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteUserProducer.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteUserProducer.kt deleted file mode 100644 index 23072b5..0000000 --- a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/MockDeleteUserProducer.kt +++ /dev/null @@ -1,16 +0,0 @@ -package hs.kr.entrydsm.user.infrastructure.kafka.producer - -import org.springframework.context.annotation.Profile -import org.springframework.stereotype.Component - -/** - * Kafka가 도입되기 전까지 사용할 임시 Mock Producer입니다. - */ -@Component -@Profile("!kafka") -class MockDeleteUserProducer : DeleteUserProducer { - override fun send(receiptCode: Long) { - // TODO: Kafka 도입 후 실제 구현체로 교체 - println("Mock: DeleteUser event sent for receiptCode: $receiptCode (Kafka not available)") - } -} \ No newline at end of file diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducer.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducer.kt new file mode 100644 index 0000000..f736b03 --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducer.kt @@ -0,0 +1,29 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.producer + +import java.util.UUID + +/** + * 사용자 이벤트를 발행하는 Producer 인터페이스입니다. + * + * 접수번호 업데이트 성공/실패 이벤트를 발행하여 + * Choreography 패턴 기반의 분산 트랜잭션을 지원합니다. + */ +interface UserEventProducer { + + /** + * 접수번호 업데이트 완료 이벤트를 발행합니다. + * + * @param receiptCode 업데이트된 접수번호 + * @param userId 사용자 ID + */ + fun sendReceiptCodeUpdateCompleted(receiptCode: Long, userId: UUID) + + /** + * 접수번호 업데이트 실패 이벤트를 발행합니다. + * + * @param receiptCode 업데이트 실패한 접수번호 + * @param userId 사용자 ID + * @param reason 실패 사유 + */ + fun sendReceiptCodeUpdateFailed(receiptCode: Long, userId: UUID, reason: String) +} diff --git a/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducerImpl.kt b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducerImpl.kt new file mode 100644 index 0000000..f62f1da --- /dev/null +++ b/casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/producer/UserEventProducerImpl.kt @@ -0,0 +1,52 @@ +package hs.kr.entrydsm.user.infrastructure.kafka.producer + +import hs.kr.entrydsm.user.infrastructure.kafka.configuration.KafkaTopics +import hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto.UserReceiptCodeUpdateCompletedEvent +import hs.kr.entrydsm.user.infrastructure.kafka.consumer.dto.UserReceiptCodeUpdateFailedEvent +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Service +import java.util.UUID + +/** + * 사용자 이벤트를 발행하는 Producer 구현체입니다. + * + * 접수번호 업데이트 성공/실패 결과를 다른 마이크로서비스에 알려 + * Choreography 패턴 기반의 분산 트랜잭션을 처리할 수 있도록 합니다. + * + * @property userEventKafkaTemplate 사용자 이벤트 발행용 KafkaTemplate + */ +@Service +class UserEventProducerImpl( + private val userEventKafkaTemplate: KafkaTemplate, +): UserEventProducer { + + /** + * 접수번호 업데이트 완료 이벤트를 발행합니다. + * + * 원서 서비스에서 요청한 접수번호 업데이트가 성공적으로 완료되었음을 + * 알리는 이벤트를 발행합니다. + * + * @param receiptCode 업데이트된 접수번호 + * @param userId 사용자 ID + */ + override fun sendReceiptCodeUpdateCompleted(receiptCode: Long, userId: UUID) { + val event = UserReceiptCodeUpdateCompletedEvent(receiptCode, userId) + userEventKafkaTemplate.send(KafkaTopics.USER_RECEIPT_CODE_UPDATE_COMPLETED, event) + } + + /** + * 접수번호 업데이트 실패 이벤트를 발행합니다. + * + * 원서 서비스에서 요청한 접수번호 업데이트가 실패했음을 알려 + * 원서 서비스에서 보상 트랜잭션을 수행하도록 합니다. + * + * @param receiptCode 업데이트 실패한 접수번호 + * @param userId 사용자 ID + * @param reason 실패 사유 + */ + override fun sendReceiptCodeUpdateFailed(receiptCode: Long, userId: UUID, reason: String) { + val event = UserReceiptCodeUpdateFailedEvent(receiptCode, userId, reason) + userEventKafkaTemplate.send(KafkaTopics.USER_RECEIPT_CODE_UPDATE_FAILED, event) + } + +}