diff --git a/mcp/src/server/events.ts b/mcp/src/server/events.ts index f1b7034b..ed7d013b 100644 --- a/mcp/src/server/events.ts +++ b/mcp/src/server/events.ts @@ -47,6 +47,9 @@ class EventBus { /** * Emit an event to all subscribers. + * Assigns a sequence number and immediately notifies subscribers. + * For SQLite-backed stores, use nextSequence() + notify() instead + * to ensure the sequence is persisted before subscribers see it. */ emit( type: AFSEventType, @@ -61,6 +64,22 @@ class EventBus { payload, }; + this.notify(event); + return event; + } + + /** + * Get the next unique sequence number. + */ + nextSequence(): number { + return ++globalSequence; + } + + /** + * Notify all subscribers of an event. + * The event must already have a valid sequence assigned. + */ + notify(event: AFSEvent): void { // Notify global subscribers for (const handler of this.handlers) { try { @@ -71,7 +90,7 @@ class EventBus { } // Notify session-specific subscribers - const sessionHandlers = this.sessionHandlers.get(sessionId); + const sessionHandlers = this.sessionHandlers.get(event.sessionId); if (sessionHandlers) { for (const handler of sessionHandlers) { try { @@ -81,8 +100,6 @@ class EventBus { } } } - - return event; } /** @@ -188,6 +205,15 @@ class UserEventBus { payload, }; + this.notifyForUser(userId, event); + return event; + } + + /** + * Notify all subscribers for a specific user of an event. + * The event must already have a valid sequence assigned. + */ + notifyForUser(userId: string, event: AFSEvent): void { // Notify user-specific global subscribers const userHandlers = this.userHandlers.get(userId); if (userHandlers) { @@ -203,7 +229,7 @@ class UserEventBus { // Notify user-specific session subscribers const userSessions = this.userSessionHandlers.get(userId); if (userSessions) { - const sessionHandlers = userSessions.get(sessionId); + const sessionHandlers = userSessions.get(event.sessionId); if (sessionHandlers) { for (const handler of sessionHandlers) { try { @@ -214,8 +240,6 @@ class UserEventBus { } } } - - return event; } /** diff --git a/mcp/src/server/sqlite.ts b/mcp/src/server/sqlite.ts index c15ae5f1..4f9950ce 100644 --- a/mcp/src/server/sqlite.ts +++ b/mcp/src/server/sqlite.ts @@ -18,6 +18,7 @@ import type { Annotation, AnnotationStatus, ThreadMessage, + ActionRequest, Organization, User, UserRole, @@ -305,14 +306,43 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString(); stmts.pruneOldEvents.run(cutoff); - function persistEvent(event: AFSEvent): void { - stmts.insertEvent.run({ - type: event.type, - timestamp: event.timestamp, - sessionId: event.sessionId, - sequence: event.sequence, - payload: JSON.stringify(event.payload), - }); + /** + * Persist an event to SQLite and notify subscribers. + * Assigns a unique sequence number with retry on UNIQUE constraint violation. + * Persists BEFORE notifying so SSE subscribers see the correct sequence. + */ + function emitAndPersist( + type: AFSEventType, + sessionId: string, + payload: Annotation | Session | ThreadMessage | ActionRequest + ): void { + const MAX_RETRIES = 5; + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { + const event: AFSEvent = { + type, + timestamp: new Date().toISOString(), + sessionId, + sequence: eventBus.nextSequence(), + payload, + }; + try { + stmts.insertEvent.run({ + type: event.type, + timestamp: event.timestamp, + sessionId: event.sessionId, + sequence: event.sequence, + payload: JSON.stringify(event.payload), + }); + eventBus.notify(event); + return; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) { + continue; + } + throw err; + } + } } return { @@ -335,8 +365,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { metadata: null, }); - const event = eventBus.emit("session.created", session.id, session); - persistEvent(event); + emitAndPersist("session.created", session.id, session); return session; }, @@ -366,8 +395,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const session = this.getSession(id); if (session) { const eventType: AFSEventType = status === "closed" ? "session.closed" : "session.updated"; - const event = eventBus.emit(eventType, id, session); - persistEvent(event); + emitAndPersist(eventType, id, session); } return session; }, @@ -425,8 +453,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { authorId: annotation.authorId ?? null, }); - const event = eventBus.emit("annotation.created", sessionId, annotation); - persistEvent(event); + emitAndPersist("annotation.created", sessionId, annotation); return annotation; }, @@ -457,8 +484,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const updated = this.getAnnotation(id); if (updated && existing.sessionId) { - const event = eventBus.emit("annotation.updated", existing.sessionId, updated); - persistEvent(event); + emitAndPersist("annotation.updated", existing.sessionId, updated); } return updated; }, @@ -495,8 +521,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const updated = this.updateAnnotation(annotationId, { thread }); if (updated && existing.sessionId) { - const event = eventBus.emit("thread.message", existing.sessionId, message); - persistEvent(event); + emitAndPersist("thread.message", existing.sessionId, message); } return updated; @@ -519,8 +544,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { stmts.deleteAnnotation.run(id); if (existing.sessionId) { - const event = eventBus.emit("annotation.deleted", existing.sessionId, existing); - persistEvent(event); + emitAndPersist("annotation.deleted", existing.sessionId, existing); } return existing; @@ -659,15 +683,44 @@ export function createTenantStore(dbPath?: string): TenantStore { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString(); tenantStmts.pruneOldEvents.run(cutoff); - function persistEventForUser(event: AFSEvent, userId: string): void { - tenantStmts.insertEvent.run({ - type: event.type, - timestamp: event.timestamp, - sessionId: event.sessionId, - sequence: event.sequence, - payload: JSON.stringify(event.payload), - userId, - }); + /** + * Persist a user-scoped event to SQLite and notify subscribers. + * Assigns a unique sequence number with retry on UNIQUE constraint violation. + */ + function emitAndPersistForUser( + userId: string, + type: AFSEventType, + sessionId: string, + payload: Annotation | Session | ThreadMessage | ActionRequest + ): void { + const MAX_RETRIES = 5; + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { + const event: AFSEvent = { + type, + timestamp: new Date().toISOString(), + sessionId, + sequence: eventBus.nextSequence(), + payload, + }; + try { + tenantStmts.insertEvent.run({ + type: event.type, + timestamp: event.timestamp, + sessionId: event.sessionId, + sequence: event.sequence, + payload: JSON.stringify(event.payload), + userId, + }); + eventBus.notify(event); + return; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) { + continue; + } + throw err; + } + } } return { @@ -801,8 +854,7 @@ export function createTenantStore(dbPath?: string): TenantStore { userId, }); - const event = eventBus.emit("session.created", session.id, session); - persistEventForUser(event, userId); + emitAndPersistForUser(userId, "session.created", session.id, session); return session; },