Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
640fd30
feat ( #33 ) : kafkaProducerConfig 추가
qkrwndnjs1075 Sep 3, 2025
34297ff
feat ( #33 ) : kafka 의존성 추가
qkrwndnjs1075 Sep 3, 2025
bc61ac3
chore ( #33 ) : 원래 있던 Mock으로 주입된 Producer 클래스 삭제
qkrwndnjs1075 Sep 3, 2025
b0e4cab
feat ( #33 ) : deleteAll 추가
qkrwndnjs1075 Sep 3, 2025
b2a419c
feat ( #33 ) : consumer config 추가
qkrwndnjs1075 Sep 3, 2025
3aca7b3
feat ( #33 ) : kafkaProperty 추가
qkrwndnjs1075 Sep 3, 2025
1dcada7
feat ( #33 ) : kafkaTopic 추가
qkrwndnjs1075 Sep 3, 2025
0cd4320
feat ( #33 ) : consumer 추가
qkrwndnjs1075 Sep 3, 2025
996e4b6
refactor ( #33 ) : 원래 Mock으로 주입된 Producer 변경
qkrwndnjs1075 Sep 3, 2025
f146848
feat ( #33 ) : DeleteAllTableProducer 구현체 추가
qkrwndnjs1075 Sep 3, 2025
b2797de
feat ( #33 ) : CreateApplicationEvent 추가
qkrwndnjs1075 Sep 3, 2025
264a048
feat ( #33 ) : 사가 패턴 이벤트 추가
qkrwndnjs1075 Sep 3, 2025
a89992b
refactor ( #33 ) : saga 패턴에 부합하게 변경
qkrwndnjs1075 Sep 3, 2025
9ab1874
feat ( #33 ) : config 추가
qkrwndnjs1075 Sep 3, 2025
d43a292
feat ( #33 ) : topics 추가
qkrwndnjs1075 Sep 3, 2025
999fcad
feat ( #33 ) : userEventProducer 추가
qkrwndnjs1075 Sep 3, 2025
21c5394
feat ( #33 ) : producerImpl 추가
qkrwndnjs1075 Sep 3, 2025
3e7c3e3
chore ( #33 ) : 클래스 네임 변경
qkrwndnjs1075 Sep 3, 2025
ca54f72
feat ( #33 ) : 트랜잭션 커밋 후 성공 이벤트 발송하도록 변경
qkrwndnjs1075 Sep 3, 2025
1b14e14
feat ( #33 ) : kdoc 작성
qkrwndnjs1075 Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ object Dependencies {
const val SENTRY_SPRING_BOOT_STARTER = "io.sentry:sentry-spring-boot-starter-jakarta:${DependencyVersion.SENTRY}"

// Spring Cloud Config
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config"
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter:2024.0.2"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

spring-cloud-starter-config → spring-cloud-starter 변경은 오타 가능성이 큽니다

Config 클라이언트를 쓰려면 spring-cloud-starter-config가 맞습니다. spring-cloud-starter는 기능 제공용 스타터가 아니고, 버전 지정도 spring-cloud-dependencies BOM으로 관리하는 편이 안전합니다. 되돌리는 것을 권장합니다.

-    const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter:2024.0.2"
+    // Managed via spring-cloud-dependencies BOM
+    const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter:2024.0.2"
// Managed via spring-cloud-dependencies BOM
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config"
🤖 Prompt for AI Agents
In buildSrc/src/main/kotlin/Dependencies.kt around line 53, the constant is
incorrectly set to "spring-cloud-starter" with an explicit version; change it
back to "org.springframework.cloud:spring-cloud-starter-config" and remove the
hard-coded version string so the artifact/version is managed by the
spring-cloud-dependencies BOM instead (or if BOM not yet imported, import the
BOM and rely on it for the version); update the constant value accordingly and
ensure the build uses the BOM for Spring Cloud versions.


const val KAFKA = "org.springframework.kafka:spring-kafka"
}
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Plugin.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
object Plugin {
const val KOTLIN_JVM = "org.jetbrains.kotlin.jvm"
const val KOTLIN_SPRING = "org.jetbrains.kotlin.plugin.spring"
const val KOTLIN_JPA = "org.jetbrains.kotlin.plugin.jpa"
const val KOTLIN_KAPT = "org.jetbrains.kotlin.kapt"
const val SPRING_BOOT = "org.springframework.boot"
const val SPRING_DEPENDENCY_MANAGEMENT = "io.spring.dependency-management"
Expand Down
6 changes: 5 additions & 1 deletion casper-user/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id(Plugin.KOTLIN_JVM) version PluginVersion.KOTLIN_VERSION
id(Plugin.KOTLIN_SPRING) version PluginVersion.KOTLIN_VERSION
id(Plugin.KOTLIN_JPA) version PluginVersion.KOTLIN_VERSION
id(Plugin.KOTLIN_KAPT)
id(Plugin.SPRING_BOOT) version PluginVersion.SPRING_BOOT_VERSION
id(Plugin.SPRING_DEPENDENCY_MANAGEMENT) version PluginVersion.SPRING_DEPENDENCY_MANAGEMENT_VERSION
Expand Down Expand Up @@ -78,8 +79,11 @@ dependencies {
// Sentry
implementation(Dependencies.SENTRY_SPRING_BOOT_STARTER)

//kafka
implementation(Dependencies.KAFKA)

// Spring Cloud Config
implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)
//implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ class UserPersistenceAdapter(
userRepository.deleteById(userId)
}

/**
* 모든 사용자를 삭제합니다.
* 관리자의 전체 데이터 초기화 시에만 사용됩니다.
*/
override fun deleteAll() {
userRepository.deleteAll()
}

/**
* 지정된 일수보다 오래된 탈퇴 사용자를 조회합니다.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ interface DeleteUserPort {
* @return 삭제 대상 사용자 목록
*/
fun findWithdrawnUsersOlderThan(days: Long): List<User>

/**
* 모든 사용자를 삭제합니다.
* 관리자의 전체 데이터 초기화 시에만 사용됩니다.
*/
fun deleteAll()
Comment on lines +26 to +30
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

대량 삭제는 JPA deleteAll 대신 배치/DDL 기반으로 처리하세요.

deleteAll은 엔티티 로딩/개별 삭제로 성능·락 부담이 큽니다. deleteAllInBatch/TRUNCATE(+ FK 안전성 검토)로 구현하고 반환값(삭제 건수)도 노출하는 편이 운영에 유리합니다.

-interface DeleteUserPort {
+interface DeleteUserPort {
@@
-    fun deleteAll()
+    fun deleteAll(): Long
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* 모든 사용자를 삭제합니다.
* 관리자의 전체 데이터 초기화 시에만 사용됩니다.
*/
fun deleteAll()
interface DeleteUserPort {
/**
* 모든 사용자를 삭제합니다.
* 관리자의 전체 데이터 초기화 시에만 사용됩니다.
*/
fun deleteAll(): Long
}
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/domain/user/application/port/out/DeleteUserPort.kt
around lines 26 to 30, the deleteAll() API currently suggests deleting all users
via JPA entity deletes which is inefficient and causes locking; change the
contract to perform a bulk/DDL delete and return the number of deleted rows
(e.g., fun deleteAll(): Long), and implement it using
repository.deleteAllInBatch() or a native TRUNCATE (after reviewing FK
constraints and disabling/clearing dependent data as needed); ensure the
implementation avoids loading entities, executes in a single bulk operation, and
the port returns the deleted count for operational visibility.

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import hs.kr.entrydsm.user.domain.user.application.port.`in`.ChangeReceiptCodeUs
import hs.kr.entrydsm.user.domain.user.application.port.out.QueryUserPort
import hs.kr.entrydsm.user.domain.user.application.port.out.SaveUserPort
import hs.kr.entrydsm.user.domain.user.exception.UserNotFoundException
import hs.kr.entrydsm.user.infrastructure.kafka.producer.UserEventProducer
import jakarta.transaction.Synchronization
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import org.springframework.transaction.support.TransactionSynchronization
import org.springframework.transaction.support.TransactionSynchronizationManager
import java.util.UUID

/**
Expand All @@ -14,12 +18,13 @@ import java.util.UUID
*
* @property queryUserPort 사용자 조회 포트
* @property saveUserPort 사용자 저장 포트
* @property userEventProducer 사용자 이벤트 발행기
*/
@Transactional
@Service
class ChangeReceiptCodeService(
private val queryUserPort: QueryUserPort,
private val saveUserPort: SaveUserPort,
private val userEventProducer: UserEventProducer
) : ChangeReceiptCodeUseCase {
/**
* 사용자의 접수코드를 변경합니다.
Expand All @@ -28,12 +33,43 @@ class ChangeReceiptCodeService(
* @param receiptCode 새로운 접수코드
* @throws UserNotFoundException 사용자가 존재하지 않는 경우
*/
@Transactional
override fun changeReceiptCode(
userId: UUID,
receiptCode: Long,
) {
val user = queryUserPort.findById(userId) ?: throw UserNotFoundException
val updateUser = user.changeReceiptCode(receiptCode)
saveUserPort.save(updateUser)
try {
val user = queryUserPort.findById(userId)

if (user == null) {
userEventProducer.sendReceiptCodeUpdateFailed(
receiptCode = receiptCode,
userId = userId,
reason = "User not found"
)
throw UserNotFoundException
}

val updatedUser = user.copy(receiptCode = receiptCode)
saveUserPort.save(updatedUser)

TransactionSynchronizationManager.registerSynchronization(object : TransactionSynchronization {
override fun afterCommit() {
userEventProducer.sendReceiptCodeUpdateCompleted(receiptCode, userId)
}
})


} catch (e: Exception) {

if (e !is UserNotFoundException) {
userEventProducer.sendReceiptCodeUpdateFailed(
receiptCode = receiptCode,
userId = userId,
reason = e.message ?: "Unknown error"
)
}
throw e // 예외 다시 던져서 롤백 발생
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package hs.kr.entrydsm.user.infrastructure.kafka.configuration

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer

/**
* Kafka Consumer 설정을 담당하는 Configuration 클래스입니다.
*
* 원서 생성 이벤트 수신을 위한 Consumer 설정을 구성하며,
* Confluent Cloud 연결을 위한 보안 설정을 포함합니다.
*
* @property kafkaProperty Kafka 연결 정보를 담은 프로퍼티
*/
@EnableKafka
@Configuration
class KafkaConsumerConfig(
private val kafkaProperty: KafkaProperty
) {

/**
* Kafka 리스너 컨테이너 팩토리를 생성합니다.
*
* 동시성 레벨을 2로 설정하여 병렬 메시지 처리를 지원하며,
* 폴링 타임아웃을 500ms로 설정하여 적절한 응답성을 보장합니다.
*
* @return 설정된 ConcurrentKafkaListenerContainerFactory 인스턴스
*/
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
setConcurrency(2)
consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig())
containerProperties.pollTimeout = 500
}
Comment on lines +35 to +40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

JsonDeserializer를 쓰는데 컨테이너 제네릭이 <String, String>인 불일치 — 런타임 캐스팅 이슈 위험

value를 JSON으로 역직렬화한다면 컨테이너/팩토리 제네릭을 <String, Any> 또는 구체 DTO로 맞추세요.

다음처럼 정렬하는 것을 권장합니다:

-    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
-        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
+    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
+        return ConcurrentKafkaListenerContainerFactory<String, Any>().apply {
             setConcurrency(2)
-            consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig())
+            consumerFactory = DefaultKafkaConsumerFactory<String, Any>(consumerFactoryConfig())
             containerProperties.pollTimeout = 500
         }
     }

필요 시 ack 모드도 명시적으로 설정해 주세요(예: RECORD/BATCH).

+import org.springframework.kafka.listener.ContainerProperties
...
+            containerProperties.ackMode = ContainerProperties.AckMode.BATCH
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
setConcurrency(2)
consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig())
containerProperties.pollTimeout = 500
}
import org.springframework.kafka.listener.ContainerProperties
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
return ConcurrentKafkaListenerContainerFactory<String, Any>().apply {
setConcurrency(2)
consumerFactory = DefaultKafkaConsumerFactory<String, Any>(consumerFactoryConfig())
containerProperties.pollTimeout = 500
containerProperties.ackMode = ContainerProperties.AckMode.BATCH
}
}
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt
around lines 19 to 24, the ConcurrentKafkaListenerContainerFactory is declared
with generics <String, String> while the value uses Json deserialization which
risks runtime ClassCastExceptions; change the factory and method signature to
use <String, Any> (or the concrete DTO type you expect) and ensure
DefaultKafkaConsumerFactory is created/configured with a JsonDeserializer for
the value type (and matching generic), adjust any listener method signatures to
match the chosen value type, and explicitly set the container's AckMode (e.g.,
RECORD or BATCH) on containerProperties if required.

}

/**
* Kafka Consumer의 기본 설정을 구성합니다.
*
* Confluent Cloud 연결을 위한 SASL 보안 설정과 역직렬화 설정을 포함하며,
* read_committed 격리 레벨로 트랜잭션 안정성을 보장합니다.
*
* @return Consumer 설정 맵
*/
private fun consumerFactoryConfig(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000,
JsonDeserializer.TRUSTED_PACKAGES to "*",
Comment on lines +59 to +60
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

max.poll.interval.ms=5000(5초)은 과도하게 낮아 잦은 리밸런스 유발 가능

처리 시간이 5초만 넘어도 세션이 끊길 수 있습니다. 기본값(5분, 300000) 수준으로 상향하거나 프로퍼티로 외부화하세요.

-            ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000,
+            ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 300_000,
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt
around lines 35-36, the consumer property
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG is set to 5000 which is too low and
will cause frequent rebalances; change it to a higher value such as the default
300000 (5 minutes) or, better, externalize it as a configurable property (read
from application properties/env and fall back to 300000) so operators can tune
it without code changes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

JsonDeserializer TRUSTED_PACKAGES="*"는 보안 취약점

신뢰 패키지를 서비스 도메인으로 한정하세요. 필요 시 여러 패키지를 콤마로 나열.

-            JsonDeserializer.TRUSTED_PACKAGES to "*",
+            JsonDeserializer.TRUSTED_PACKAGES to "hs.kr.entrydsm.user.**,hs.kr.entrydsm.common.**",

여러 이벤트 타입을 수신한다면 type headers를 유지하되, 기본 타입을 강제해야 할 경우:

+            JsonDeserializer.VALUE_DEFAULT_TYPE to "hs.kr.entrydsm.user.infrastructure.kafka.event.UserEvent",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
JsonDeserializer.TRUSTED_PACKAGES to "*",
JsonDeserializer.TRUSTED_PACKAGES to "hs.kr.entrydsm.user.**,hs.kr.entrydsm.common.**",
JsonDeserializer.VALUE_DEFAULT_TYPE to "hs.kr.entrydsm.user.infrastructure.kafka.event.UserEvent",
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaConsumerConfig.kt
around line 36, JsonDeserializer.TRUSTED_PACKAGES is set to "*" which is a
security risk; replace the wildcard with a comma-separated list of only your
service/domain packages (e.g. "hs.kr.entrydsm.user" and any other internal
packages) to limit deserialization to trusted types, and if you must support
multiple event types keep the type headers enabled but configure a safe default
type or explicit allowed packages rather than using "*" to prevent arbitrary
class deserialization.

"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package hs.kr.entrydsm.user.infrastructure.kafka.configuration

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer

/**
* Kafka Producer 설정을 담당하는 Configuration 클래스입니다.
*
* 사용자 삭제, 전체 테이블 삭제, 사용자 이벤트 발행을 위한
* 각각의 KafkaTemplate과 ProducerFactory를 구성하며,
* Confluent Cloud 연결을 위한 보안 설정을 포함합니다.
*
* @property kafkaProperty Kafka 연결 정보를 담은 프로퍼티
*/
@Configuration
class KafkaProducerConfig(
private val kafkaProperty: KafkaProperty
) {

/**
* 전체 테이블 삭제 이벤트용 Producer Factory를 생성합니다.
*
* @return Unit 타입 메시지용 DefaultKafkaProducerFactory
*/
@Bean
fun deleteAllTableProducerFactory(): DefaultKafkaProducerFactory<String, Unit> {
return DefaultKafkaProducerFactory(producerConfig())
}

/**
* 전체 테이블 삭제 이벤트 발행을 위한 KafkaTemplate을 생성합니다.
*
* @return Unit 타입 메시지용 KafkaTemplate
*/
@Bean
fun deleteAllTableKafkaTemplate(): KafkaTemplate<String, Unit> {
return KafkaTemplate(deleteAllTableProducerFactory())
}

/**
* 사용자 삭제 이벤트용 Producer Factory를 생성합니다.
*
* @return Long 타입 메시지용 DefaultKafkaProducerFactory
*/
@Bean
fun deleteUserProducerFactory(): DefaultKafkaProducerFactory<String, Long> {
return DefaultKafkaProducerFactory(producerConfig())
}

/**
* 사용자 삭제 이벤트 발행을 위한 KafkaTemplate을 생성합니다.
*
* @return Long 타입 메시지용 KafkaTemplate
*/
@Bean
fun deleteUserKafkaTemplate(): KafkaTemplate<String, Long> {
return KafkaTemplate(deleteUserProducerFactory())
}

/**
* 사용자 이벤트용 Producer Factory를 생성합니다.
*
* @return Any 타입 메시지용 DefaultKafkaProducerFactory
*/
@Bean
fun userEventProducerFactory(): DefaultKafkaProducerFactory<String, Any> {
return DefaultKafkaProducerFactory(producerConfig())
}

/**
* 사용자 이벤트 발행을 위한 KafkaTemplate을 생성합니다.
*
* @return Any 타입 메시지용 KafkaTemplate
*/
@Bean
fun userEventKafkaTemplate(): KafkaTemplate<String, Any> {
return KafkaTemplate(userEventProducerFactory())
}

/**
* Kafka Producer의 기본 설정을 구성합니다.
*
* Confluent Cloud 연결을 위한 SASL 보안 설정과 직렬화 설정을 포함합니다.
*
* @return Producer 설정 맵
*/
private fun producerConfig(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
Comment on lines +97 to +102
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

보안 설정(프로토콜/메커니즘) 환경 불일치 가능 — TLS 기본화 권장

Confluent 환경이면 SASL_SSL/PLAIN 조합을, 자체 SCRAM 환경이어도 TLS 사용을 권장합니다. 현재 설정은 평문 전송입니다.

-            "security.protocol" to "SASL_PLAINTEXT",
-            "sasl.mechanism" to "SCRAM-SHA-512",
-            "sasl.jaas.config" to
-                    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
+            "security.protocol" to "SASL_SSL",
+            "sasl.mechanism" to "PLAIN",
+            "sasl.jaas.config" to
+                    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                     "username=\"${kafkaProperty.confluentApiKey}\" " +
                     "password=\"${kafkaProperty.confluentApiSecret}\";"

환경에 맞는 truststore 설정도 점검하세요.


🏁 Script executed:

#!/bin/bash
# KafkaProducerConfig.kt 파일 상단 컨텍스트 확인
sed -n '1,100p' casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt

Length of output: 2319


KafkaProducerConfig.kt – 보안 설정을 TLS 기반 SASL_SSL/PLAIN으로 변경
기존 SASL_PLAINTEXT+SCRAM-SHA-512 설정은 평문 전송으로 취약하므로, Confluent Cloud 환경에서는 아래와 같이 SASL_SSLPLAIN 메커니즘으로 수정하고, SSL Truststore·호스트 검증 설정도 점검하세요:

- "security.protocol" to "SASL_PLAINTEXT",
- "sasl.mechanism" to "SCRAM-SHA-512",
- "sasl.jaas.config" to
-     "org.apache.kafka.common.security.scram.ScramLoginModule required " +
+ "security.protocol" to "SASL_SSL",
+ "sasl.mechanism" to "PLAIN",
+ "sasl.jaas.config" to
+     "org.apache.kafka.common.security.plain.PlainLoginModule required " +
      "username=\"${kafkaProperty.confluentApiKey}\" " +
      "password=\"${kafkaProperty.confluentApiSecret}\";"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
"security.protocol" to "SASL_SSL",
"sasl.mechanism" to "PLAIN",
"sasl.jaas.config" to
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt
around lines 52 to 57, change the insecure plaintext SCRAM config to use
TLS-based SASL_SSL with the PLAIN mechanism: set "security.protocol" to
"SASL_SSL", set "sasl.mechanism" to "PLAIN", and update the JAAS config
accordingly to use the PLAIN login module and include the Confluent API
key/secret; also verify SSL truststore and hostname verification settings
(truststore path/password or system properties) are properly configured for
Confluent Cloud.

)
}
Comment on lines +92 to +104
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

프로듀서 내구성/정확성 기본값 강화 필요(acks/idempotence 등)

운영 안전성을 위해 최소한 acks=all, enable.idempotence=true를 권장합니다. 배치/압축도 기본값을 튜닝하세요.

         return mapOf(
             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
+            ProducerConfig.ACKS_CONFIG to "all",
+            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+            ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE,
+            ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG to 120_000,
+            ProducerConfig.LINGER_MS_CONFIG to 10,
+            ProducerConfig.COMPRESSION_TYPE_CONFIG to "zstd",
             "security.protocol" to "SASL_PLAINTEXT",
             "sasl.mechanism" to "SCRAM-SHA-512",

비고: COMPRESSION_TYPE은 인프라 표준에 맞춰 gzip/snappy/zstd 중 선택.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private fun producerConfig(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
)
}
private fun producerConfig(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG to 120_000,
ProducerConfig.LINGER_MS_CONFIG to 10,
ProducerConfig.COMPRESSION_TYPE_CONFIG to "zstd",
"security.protocol" to "SASL_PLAINTEXT",
"sasl.mechanism" to "SCRAM-SHA-512",
"sasl.jaas.config" to
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"${kafkaProperty.confluentApiKey}\" " +
"password=\"${kafkaProperty.confluentApiSecret}\";"
)
}
🤖 Prompt for AI Agents
In
casper-user/src/main/kotlin/hs/kr/entrydsm/user/infrastructure/kafka/configuration/KafkaProducerConfig.kt
around lines 47 to 59, the producer config needs stronger durability and
correctness defaults: add ProducerConfig.ACKS_CONFIG set to "all",
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG set to true, increase
ProducerConfig.RETRIES_CONFIG to a large value (or Integer.MAX_VALUE), set
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 (or 5 depending on
throughput vs ordering tradeoff), and add compression via
ProducerConfig.COMPRESSION_TYPE_CONFIG (choose gzip/snappy/zstd per infra
standard); also tune ProducerConfig.LINGER_MS_CONFIG and
ProducerConfig.BATCH_SIZE_CONFIG to sensible defaults for your workload. Ensure
these keys and values are added to the returned map alongside the existing
entries.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package hs.kr.entrydsm.user.infrastructure.kafka.configuration

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConfigurationPropertiesBinding

/**
* Kafka 연결을 위한 설정 프로퍼티 클래스입니다.
*
* application.yml의 kafka 섹션에서 설정값을 바인딩하여
* Confluent Cloud Kafka 클러스터 연결에 필요한 정보를 관리합니다.
*
* @property serverAddress Kafka 브로커 서버 주소
* @property confluentApiKey Confluent Cloud 접근을 위한 API 키
* @property confluentApiSecret Confluent Cloud 접근을 위한 API 시크릿
*/
@ConfigurationPropertiesBinding
@ConfigurationProperties("kafka")
class KafkaProperty(
val serverAddress: String,
val confluentApiKey: String,
val confluentApiSecret: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package hs.kr.entrydsm.user.infrastructure.kafka.configuration

/**
* Kafka 토픽명을 관리하는 상수 객체입니다.
*
* 마이크로서비스 간 이벤트 통신에 사용되는 토픽명들을 중앙에서 관리하여
* 토픽명 변경 시 일관성을 보장합니다.
*/
object KafkaTopics {
/**
* 사용자 삭제 이벤트 토픽
* 사용자가 탈퇴했을 때 다른 서비스에 알리기 위해 사용
*/
const val DELETE_USER = "delete-user"

/**
* 전체 테이블 삭제 이벤트 토픽
* 관리자가 전체 데이터를 초기화할 때 다른 서비스에 알리기 위해 사용
*/
const val DELETE_ALL_TABLE = "delete-all-table"

/**
* 원서 생성 이벤트 토픽
* 원서 서비스에서 원서가 생성되었을 때 사용자 접수번호 업데이트를 위해 사용
*/
const val CREATE_APPLICATION = "create-application"

// Choreography 이벤트들

/**
* 사용자 접수번호 업데이트 완료 이벤트 토픽
* 사용자 서비스에서 접수번호 업데이트가 성공적으로 완료되었음을 알리는 이벤트
*/
const val USER_RECEIPT_CODE_UPDATE_COMPLETED = "user-receipt-code-update-completed"

/**
* 사용자 접수번호 업데이트 실패 이벤트 토픽
* 사용자 서비스에서 접수번호 업데이트가 실패했음을 알리는 이벤트
* 원서 서비스에서 이 이벤트를 수신하면 보상 트랜잭션을 수행함
*/
const val USER_RECEIPT_CODE_UPDATE_FAILED = "user-receipt-code-update-failed"
}
Loading