Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 81 additions & 8 deletions mcp/src/server/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,78 @@ function initializeStore(): AFSStore {
// In-Memory Store (fallback)
// -----------------------------------------------------------------------------

function createMemoryStore(): AFSStore {
/** Configuration for memory store event retention */
interface MemoryStoreConfig {
/** Maximum number of events to retain (default: 10000) */
maxEvents: number;
/** Maximum age of events in milliseconds (default: 1 hour) */
maxEventAgeMs: number;
/** How often to run cleanup in milliseconds (default: 5 minutes) */
cleanupIntervalMs: number;
}

/** Default memory store configuration */
const DEFAULT_MEMORY_CONFIG: MemoryStoreConfig = {
maxEvents: parseInt(process.env.AGENTATION_MAX_EVENTS || "10000", 10),
maxEventAgeMs: parseInt(process.env.AGENTATION_EVENT_TTL_MS || String(60 * 60 * 1000), 10), // 1 hour
cleanupIntervalMs: parseInt(process.env.AGENTATION_CLEANUP_INTERVAL_MS || String(5 * 60 * 1000), 10), // 5 minutes
};

function createMemoryStore(config: MemoryStoreConfig = DEFAULT_MEMORY_CONFIG): AFSStore {
const sessions = new Map<string, Session>();
const annotations = new Map<string, Annotation>();
const events: AFSEvent[] = [];
let cleanupTimer: ReturnType<typeof setInterval> | null = null;

/**
* Clean up old events based on count and age limits.
* This prevents unbounded memory growth in long-running processes.
*/
function cleanupEvents(): void {
const now = Date.now();
const cutoffTime = now - config.maxEventAgeMs;
let removedByAge = 0;
let removedByCount = 0;

// Remove events older than maxEventAgeMs
while (events.length > 0) {
const oldestEvent = events[0];
const eventTime = new Date(oldestEvent.timestamp).getTime();
if (eventTime < cutoffTime) {
events.shift();
removedByAge++;
} else {
break;
}
}

// If still over limit, remove oldest events
while (events.length > config.maxEvents) {
events.shift();
removedByCount++;
}

if (removedByAge > 0 || removedByCount > 0) {
process.stderr.write(
`[Store] Event cleanup: removed ${removedByAge} expired, ${removedByCount} over limit, ${events.length} remaining\n`
);
}
}

/**
* Add an event and trigger cleanup if needed.
*/
function addEvent(event: AFSEvent): void {
addEvent(event);

// Quick check: if significantly over limit, trigger immediate cleanup
if (events.length > config.maxEvents * 1.2) {
cleanupEvents();
}
}

// Start periodic cleanup
cleanupTimer = setInterval(cleanupEvents, config.cleanupIntervalMs);

function generateId(): string {
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
Expand All @@ -85,7 +153,7 @@ function createMemoryStore(): AFSStore {
sessions.set(session.id, session);

const event = eventBus.emit("session.created", session.id, session);
events.push(event);
addEvent(event);

return session;
},
Expand Down Expand Up @@ -117,7 +185,7 @@ function createMemoryStore(): AFSStore {

const eventType = status === "closed" ? "session.closed" : "session.updated";
const event = eventBus.emit(eventType, id, session);
events.push(event);
addEvent(event);

return session;
},
Expand All @@ -144,7 +212,7 @@ function createMemoryStore(): AFSStore {
annotations.set(annotation.id, annotation);

const event = eventBus.emit("annotation.created", sessionId, annotation);
events.push(event);
addEvent(event);

return annotation;
},
Expand All @@ -164,7 +232,7 @@ function createMemoryStore(): AFSStore {

if (annotation.sessionId) {
const event = eventBus.emit("annotation.updated", annotation.sessionId, annotation);
events.push(event);
addEvent(event);
}

return annotation;
Expand All @@ -188,7 +256,7 @@ function createMemoryStore(): AFSStore {

if (annotation.sessionId) {
const event = eventBus.emit("annotation.updated", annotation.sessionId, annotation);
events.push(event);
addEvent(event);
}

return annotation;
Expand Down Expand Up @@ -217,7 +285,7 @@ function createMemoryStore(): AFSStore {

if (annotation.sessionId) {
const event = eventBus.emit("thread.message", annotation.sessionId, message);
events.push(event);
addEvent(event);
}

return annotation;
Expand All @@ -243,7 +311,7 @@ function createMemoryStore(): AFSStore {

if (annotation.sessionId) {
const event = eventBus.emit("annotation.deleted", annotation.sessionId, annotation);
events.push(event);
addEvent(event);
}

return annotation;
Expand All @@ -256,6 +324,11 @@ function createMemoryStore(): AFSStore {
},

close(): void {
// Stop periodic cleanup
if (cleanupTimer) {
clearInterval(cleanupTimer);
cleanupTimer = null;
}
sessions.clear();
annotations.clear();
events.length = 0;
Expand Down