Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions mcp/src/server/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class EventBus {

/**
* Emit an event to all subscribers.
* Assigns a sequence number and immediately notifies subscribers.
* For SQLite-backed stores, use nextSequence() + notify() instead
* to ensure the sequence is persisted before subscribers see it.
*/
emit(
type: AFSEventType,
Expand All @@ -61,6 +64,22 @@ class EventBus {
payload,
};

this.notify(event);
return event;
}

/**
* Get the next unique sequence number.
*/
nextSequence(): number {
return ++globalSequence;
}

/**
* Notify all subscribers of an event.
* The event must already have a valid sequence assigned.
*/
notify(event: AFSEvent): void {
// Notify global subscribers
for (const handler of this.handlers) {
try {
Expand All @@ -71,7 +90,7 @@ class EventBus {
}

// Notify session-specific subscribers
const sessionHandlers = this.sessionHandlers.get(sessionId);
const sessionHandlers = this.sessionHandlers.get(event.sessionId);
if (sessionHandlers) {
for (const handler of sessionHandlers) {
try {
Expand All @@ -81,8 +100,6 @@ class EventBus {
}
}
}

return event;
}

/**
Expand Down Expand Up @@ -188,6 +205,15 @@ class UserEventBus {
payload,
};

this.notifyForUser(userId, event);
return event;
}

/**
* Notify all subscribers for a specific user of an event.
* The event must already have a valid sequence assigned.
*/
notifyForUser(userId: string, event: AFSEvent): void {
// Notify user-specific global subscribers
const userHandlers = this.userHandlers.get(userId);
if (userHandlers) {
Expand All @@ -203,7 +229,7 @@ class UserEventBus {
// Notify user-specific session subscribers
const userSessions = this.userSessionHandlers.get(userId);
if (userSessions) {
const sessionHandlers = userSessions.get(sessionId);
const sessionHandlers = userSessions.get(event.sessionId);
if (sessionHandlers) {
for (const handler of sessionHandlers) {
try {
Expand All @@ -214,8 +240,6 @@ class UserEventBus {
}
}
}

return event;
}

/**
Expand Down
114 changes: 83 additions & 31 deletions mcp/src/server/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
Annotation,
AnnotationStatus,
ThreadMessage,
ActionRequest,
Organization,
User,
UserRole,
Expand Down Expand Up @@ -305,14 +306,43 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString();
stmts.pruneOldEvents.run(cutoff);

function persistEvent(event: AFSEvent): void {
stmts.insertEvent.run({
type: event.type,
timestamp: event.timestamp,
sessionId: event.sessionId,
sequence: event.sequence,
payload: JSON.stringify(event.payload),
});
/**
* Persist an event to SQLite and notify subscribers.
* Assigns a unique sequence number with retry on UNIQUE constraint violation.
* Persists BEFORE notifying so SSE subscribers see the correct sequence.
*/
function emitAndPersist(
type: AFSEventType,
sessionId: string,
payload: Annotation | Session | ThreadMessage | ActionRequest
): void {
const MAX_RETRIES = 5;
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
const event: AFSEvent = {
type,
timestamp: new Date().toISOString(),
sessionId,
sequence: eventBus.nextSequence(),
payload,
};
try {
stmts.insertEvent.run({
type: event.type,
timestamp: event.timestamp,
sessionId: event.sessionId,
sequence: event.sequence,
payload: JSON.stringify(event.payload),
});
eventBus.notify(event);
return;
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) {
continue;
}
throw err;
}
}
}

return {
Expand All @@ -335,8 +365,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
metadata: null,
});

const event = eventBus.emit("session.created", session.id, session);
persistEvent(event);
emitAndPersist("session.created", session.id, session);

return session;
},
Expand Down Expand Up @@ -366,8 +395,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
const session = this.getSession(id);
if (session) {
const eventType: AFSEventType = status === "closed" ? "session.closed" : "session.updated";
const event = eventBus.emit(eventType, id, session);
persistEvent(event);
emitAndPersist(eventType, id, session);
}
return session;
},
Expand Down Expand Up @@ -425,8 +453,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
authorId: annotation.authorId ?? null,
});

const event = eventBus.emit("annotation.created", sessionId, annotation);
persistEvent(event);
emitAndPersist("annotation.created", sessionId, annotation);

return annotation;
},
Expand Down Expand Up @@ -457,8 +484,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {

const updated = this.getAnnotation(id);
if (updated && existing.sessionId) {
const event = eventBus.emit("annotation.updated", existing.sessionId, updated);
persistEvent(event);
emitAndPersist("annotation.updated", existing.sessionId, updated);
}
return updated;
},
Expand Down Expand Up @@ -495,8 +521,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
const updated = this.updateAnnotation(annotationId, { thread });

if (updated && existing.sessionId) {
const event = eventBus.emit("thread.message", existing.sessionId, message);
persistEvent(event);
emitAndPersist("thread.message", existing.sessionId, message);
}

return updated;
Expand All @@ -519,8 +544,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore {
stmts.deleteAnnotation.run(id);

if (existing.sessionId) {
const event = eventBus.emit("annotation.deleted", existing.sessionId, existing);
persistEvent(event);
emitAndPersist("annotation.deleted", existing.sessionId, existing);
}

return existing;
Expand Down Expand Up @@ -659,15 +683,44 @@ export function createTenantStore(dbPath?: string): TenantStore {
const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString();
tenantStmts.pruneOldEvents.run(cutoff);

function persistEventForUser(event: AFSEvent, userId: string): void {
tenantStmts.insertEvent.run({
type: event.type,
timestamp: event.timestamp,
sessionId: event.sessionId,
sequence: event.sequence,
payload: JSON.stringify(event.payload),
userId,
});
/**
* Persist a user-scoped event to SQLite and notify subscribers.
* Assigns a unique sequence number with retry on UNIQUE constraint violation.
*/
function emitAndPersistForUser(
userId: string,
type: AFSEventType,
sessionId: string,
payload: Annotation | Session | ThreadMessage | ActionRequest
): void {
const MAX_RETRIES = 5;
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
const event: AFSEvent = {
type,
timestamp: new Date().toISOString(),
sessionId,
sequence: eventBus.nextSequence(),
payload,
};
try {
tenantStmts.insertEvent.run({
type: event.type,
timestamp: event.timestamp,
sessionId: event.sessionId,
sequence: event.sequence,
payload: JSON.stringify(event.payload),
userId,
});
eventBus.notify(event);
return;
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) {
continue;
}
throw err;
}
}
}

return {
Expand Down Expand Up @@ -801,8 +854,7 @@ export function createTenantStore(dbPath?: string): TenantStore {
userId,
});

const event = eventBus.emit("session.created", session.id, session);
persistEventForUser(event, userId);
emitAndPersistForUser(userId, "session.created", session.id, session);

return session;
},
Expand Down