Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ dependencies {

// Redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'

// Guava (동시성 제어용)
implementation 'com.google.guava:guava:33.4.0-jre'
}

tasks.named('test') {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.devkor.apu.saerok_server.domain.notification.application;

import com.google.common.util.concurrent.Striped;
import lombok.RequiredArgsConstructor;
import org.devkor.apu.saerok_server.domain.notification.application.model.batch.*;
import org.devkor.apu.saerok_server.domain.notification.application.model.payload.ActionNotificationPayload;
Expand All @@ -9,6 +10,7 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Lock;

/**
* 알림 배치 관리 서비스.
Expand All @@ -20,6 +22,7 @@ public class NotificationBatchService {

private final NotificationBatchStore batchStore;
private final NotificationBatchConfig batchConfig;
private final Striped<Lock> stripedLocks = Striped.lock(256);

/**
* 배치에 알림 추가.
Expand All @@ -40,7 +43,9 @@ public BatchResult addToBatch(ActionNotificationPayload payload) {

BatchActor actor = BatchActor.of(payload.actorId(), payload.actorName());

synchronized (this.getLockKey(key)) {
Lock lock = stripedLocks.get(key.toRedisKey());
lock.lock();
try {
Optional<NotificationBatch> existingBatch = batchStore.findBatch(key);

if (existingBatch.isPresent()) {
Expand All @@ -65,6 +70,8 @@ public BatchResult addToBatch(ActionNotificationPayload payload) {

return BatchResult.created(newBatch);
}
} finally {
lock.unlock();
}
}

Expand All @@ -75,12 +82,4 @@ public List<NotificationBatch> findExpiredBatches() {
public void deleteBatch(BatchKey key) {
batchStore.deleteBatch(key);
}

/**
* 동시성 제어를 위한 락 키 생성.
* 같은 배치 키에 대한 동시 접근을 막기 위해 String 인터닝 활용.
*/
private String getLockKey(BatchKey key) {
return key.toRedisKey().intern();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import org.devkor.apu.saerok_server.domain.notification.application.model.batch.NotificationBatch;
import org.devkor.apu.saerok_server.domain.notification.application.store.NotificationBatchStore;
import org.devkor.apu.saerok_server.global.core.config.feature.NotificationBatchConfig;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* Redis 기반 알림 배치 저장소 구현체.
Expand All @@ -26,8 +26,13 @@
@RequiredArgsConstructor
public class RedisNotificationBatchStore implements NotificationBatchStore {

private static final String KEY_PREFIX = "notification:batch:";
private static final String KEY_PATTERN = KEY_PREFIX + "*";
/**
* 만료 시간 인덱스용 Sorted Set.
* score: 만료 시간 타임스탬프 (밀리초)
* member: 배치 데이터 Redis 키
*/
private static final String EXPIRY_INDEX = "notification:batch:expiry_index";
private static final ZoneId KST = ZoneId.of("Asia/Seoul");

private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -62,6 +67,13 @@ public void saveBatch(NotificationBatch batch) {

redisTemplate.opsForValue().set(redisKey, json, Duration.ofSeconds(batchConfig.getTtlSeconds()));

long expiryTimestamp = batch.getExpiresAt()
.atZone(KST)
.toInstant()
.toEpochMilli();

redisTemplate.opsForZSet().add(EXPIRY_INDEX, redisKey, expiryTimestamp);

} catch (JsonProcessingException e) {
log.error("Redis에서 배치 데이터 직렬화에 실패했습니다: {}", batch.getKey(), e);
throw new IllegalStateException("Redis에 배치 저장하는 것에 실패했습니다", e);
Expand All @@ -72,24 +84,33 @@ public void saveBatch(NotificationBatch batch) {
public void deleteBatch(BatchKey key) {
String redisKey = key.toRedisKey();
redisTemplate.delete(redisKey);
redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey);
}

/**
* Sorted Set을 사용한 만료 배치 조회.
*/
@Override
public List<NotificationBatch> findExpiredBatches() {
List<NotificationBatch> expiredBatches = new ArrayList<>();

ScanOptions scanOptions = ScanOptions.scanOptions()
.match(KEY_PATTERN)
.count(100)
.build();
try {
long now = System.currentTimeMillis();

// Sorted Set에서 score가 현재 시간 이하인 키들만 조회
Set<String> expiredKeys = redisTemplate.opsForZSet()
.rangeByScore(EXPIRY_INDEX, 0, now, 0, batchConfig.getMaxBatchesPerTick());

if (expiredKeys == null || expiredKeys.isEmpty()) {
return expiredBatches;
}

try (Cursor<String> cursor = redisTemplate.scan(scanOptions)) {
while (cursor.hasNext()) {
String redisKey = cursor.next();
// 만료된 키들의 데이터 조회
for (String redisKey : expiredKeys) {
String json = redisTemplate.opsForValue().get(redisKey);

if (json == null) {
// 키가 스캔 후 만료되었을 수 있음
redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey);
continue;
}

Expand All @@ -99,14 +120,16 @@ public List<NotificationBatch> findExpiredBatches() {

if (batch.isExpired()) {
expiredBatches.add(batch);
redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey);
}

} catch (JsonProcessingException e) {
log.error("Redis 키 역직렬화에 실패했습니다: {}", redisKey, e);
redisTemplate.opsForZSet().remove(EXPIRY_INDEX, redisKey);
}
}
} catch (Exception e) {
log.error("만료된 배치 스캔에 실패했습니다.", e);
log.error("만료된 배치 조회에 실패했습니다.", e);
}

return expiredBatches;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class NotificationBatchConfig {
private int initialWindowSeconds = 30;
private int maxWindowSeconds = 60;
private int ttlSeconds = 90;
private int maxBatchesPerTick = 300; // 한 번의 스케줄러 틱에서 처리할 최대 배치 수

@PostConstruct
void validateConfig() {
Expand All @@ -40,5 +41,8 @@ void validateConfig() {
ttlSeconds, maxWindowSeconds)
);
}
if (maxBatchesPerTick <= 0) {
throw new IllegalStateException("notification-batch.max-batches-per-tick은 양수여야합니다");
}
}
}
3 changes: 2 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ notification-batch:
enabled: true
initial-window-seconds: 30
max-window-seconds: 60
ttl-seconds: 90
ttl-seconds: 90
max-batches-per-tick: 300