diff --git a/build.gradle b/build.gradle index b6fe6761..83107669 100644 --- a/build.gradle +++ b/build.gradle @@ -81,6 +81,9 @@ dependencies { // Redis implementation 'org.springframework.boot:spring-boot-starter-data-redis' + + // Guava (동시성 제어용) + implementation 'com.google.guava:guava:33.4.0-jre' } tasks.named('test') { diff --git a/src/main/java/org/devkor/apu/saerok_server/domain/notification/application/NotificationBatchService.java b/src/main/java/org/devkor/apu/saerok_server/domain/notification/application/NotificationBatchService.java index 355eda93..103d120a 100644 --- a/src/main/java/org/devkor/apu/saerok_server/domain/notification/application/NotificationBatchService.java +++ b/src/main/java/org/devkor/apu/saerok_server/domain/notification/application/NotificationBatchService.java @@ -1,5 +1,6 @@ package org.devkor.apu.saerok_server.domain.notification.application; +import com.google.common.util.concurrent.Striped; import lombok.RequiredArgsConstructor; import org.devkor.apu.saerok_server.domain.notification.application.model.batch.*; import org.devkor.apu.saerok_server.domain.notification.application.model.payload.ActionNotificationPayload; @@ -9,6 +10,7 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.locks.Lock; /** * 알림 배치 관리 서비스. @@ -20,6 +22,7 @@ public class NotificationBatchService { private final NotificationBatchStore batchStore; private final NotificationBatchConfig batchConfig; + private final Striped stripedLocks = Striped.lock(256); /** * 배치에 알림 추가. @@ -40,7 +43,9 @@ public BatchResult addToBatch(ActionNotificationPayload payload) { BatchActor actor = BatchActor.of(payload.actorId(), payload.actorName()); - synchronized (this.getLockKey(key)) { + Lock lock = stripedLocks.get(key.toRedisKey()); + lock.lock(); + try { Optional existingBatch = batchStore.findBatch(key); if (existingBatch.isPresent()) { @@ -65,6 +70,8 @@ public BatchResult addToBatch(ActionNotificationPayload payload) { return BatchResult.created(newBatch); } + } finally { + lock.unlock(); } } @@ -75,12 +82,4 @@ public List findExpiredBatches() { public void deleteBatch(BatchKey key) { batchStore.deleteBatch(key); } - - /** - * 동시성 제어를 위한 락 키 생성. - * 같은 배치 키에 대한 동시 접근을 막기 위해 String 인터닝 활용. - */ - private String getLockKey(BatchKey key) { - return key.toRedisKey().intern(); - } } diff --git a/src/main/java/org/devkor/apu/saerok_server/domain/notification/infra/redis/RedisNotificationBatchStore.java b/src/main/java/org/devkor/apu/saerok_server/domain/notification/infra/redis/RedisNotificationBatchStore.java index 70fec9bb..2f2c6450 100644 --- a/src/main/java/org/devkor/apu/saerok_server/domain/notification/infra/redis/RedisNotificationBatchStore.java +++ b/src/main/java/org/devkor/apu/saerok_server/domain/notification/infra/redis/RedisNotificationBatchStore.java @@ -8,15 +8,15 @@ import org.devkor.apu.saerok_server.domain.notification.application.model.batch.NotificationBatch; import org.devkor.apu.saerok_server.domain.notification.application.store.NotificationBatchStore; import org.devkor.apu.saerok_server.global.core.config.feature.NotificationBatchConfig; -import org.springframework.data.redis.core.Cursor; -import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; /** * Redis 기반 알림 배치 저장소 구현체. @@ -26,8 +26,13 @@ @RequiredArgsConstructor public class RedisNotificationBatchStore implements NotificationBatchStore { - private static final String KEY_PREFIX = "notification:batch:"; - private static final String KEY_PATTERN = KEY_PREFIX + "*"; + /** + * 만료 시간 인덱스용 Sorted Set. + * score: 만료 시간 타임스탬프 (밀리초) + * member: 배치 데이터 Redis 키 + */ + private static final String EXPIRY_INDEX = "notification:batch:expiry_index"; + private static final ZoneId KST = ZoneId.of("Asia/Seoul"); private final StringRedisTemplate redisTemplate; private final ObjectMapper objectMapper; @@ -62,6 +67,13 @@ public void saveBatch(NotificationBatch batch) { redisTemplate.opsForValue().set(redisKey, json, Duration.ofSeconds(batchConfig.getTtlSeconds())); + long expiryTimestamp = batch.getExpiresAt() + .atZone(KST) + .toInstant() + .toEpochMilli(); + + redisTemplate.opsForZSet().add(EXPIRY_INDEX, redisKey, expiryTimestamp); + } catch (JsonProcessingException e) { log.error("Redis에서 배치 데이터 직렬화에 실패했습니다: {}", batch.getKey(), e); throw new IllegalStateException("Redis에 배치 저장하는 것에 실패했습니다", e); @@ -72,24 +84,33 @@ public void saveBatch(NotificationBatch batch) { public void deleteBatch(BatchKey key) { String redisKey = key.toRedisKey(); redisTemplate.delete(redisKey); + redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey); } + /** + * Sorted Set을 사용한 만료 배치 조회. + */ @Override public List findExpiredBatches() { List expiredBatches = new ArrayList<>(); - ScanOptions scanOptions = ScanOptions.scanOptions() - .match(KEY_PATTERN) - .count(100) - .build(); + try { + long now = System.currentTimeMillis(); + + // Sorted Set에서 score가 현재 시간 이하인 키들만 조회 + Set expiredKeys = redisTemplate.opsForZSet() + .rangeByScore(EXPIRY_INDEX, 0, now, 0, batchConfig.getMaxBatchesPerTick()); + + if (expiredKeys == null || expiredKeys.isEmpty()) { + return expiredBatches; + } - try (Cursor cursor = redisTemplate.scan(scanOptions)) { - while (cursor.hasNext()) { - String redisKey = cursor.next(); + // 만료된 키들의 데이터 조회 + for (String redisKey : expiredKeys) { String json = redisTemplate.opsForValue().get(redisKey); if (json == null) { - // 키가 스캔 후 만료되었을 수 있음 + redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey); continue; } @@ -99,14 +120,16 @@ public List findExpiredBatches() { if (batch.isExpired()) { expiredBatches.add(batch); + redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey); } } catch (JsonProcessingException e) { log.error("Redis 키 역직렬화에 실패했습니다: {}", redisKey, e); + redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey); } } } catch (Exception e) { - log.error("만료된 배치 스캔에 실패했습니다.", e); + log.error("만료된 배치 조회에 실패했습니다.", e); } return expiredBatches; diff --git a/src/main/java/org/devkor/apu/saerok_server/global/core/config/feature/NotificationBatchConfig.java b/src/main/java/org/devkor/apu/saerok_server/global/core/config/feature/NotificationBatchConfig.java index ea27a96a..4fdc012c 100644 --- a/src/main/java/org/devkor/apu/saerok_server/global/core/config/feature/NotificationBatchConfig.java +++ b/src/main/java/org/devkor/apu/saerok_server/global/core/config/feature/NotificationBatchConfig.java @@ -19,6 +19,7 @@ public class NotificationBatchConfig { private int initialWindowSeconds = 30; private int maxWindowSeconds = 60; private int ttlSeconds = 90; + private int maxBatchesPerTick = 300; // 한 번의 스케줄러 틱에서 처리할 최대 배치 수 @PostConstruct void validateConfig() { @@ -40,5 +41,8 @@ void validateConfig() { ttlSeconds, maxWindowSeconds) ); } + if (maxBatchesPerTick <= 0) { + throw new IllegalStateException("notification-batch.max-batches-per-tick은 양수여야합니다"); + } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 278d68f6..1af059ae 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -44,4 +44,5 @@ notification-batch: enabled: true initial-window-seconds: 30 max-window-seconds: 60 - ttl-seconds: 90 \ No newline at end of file + ttl-seconds: 90 + max-batches-per-tick: 300 \ No newline at end of file