From 4229afd617714602cbf1cac673e4d8cda42013e8 Mon Sep 17 00:00:00 2001 From: Alex Gorbatchev Date: Tue, 10 Mar 2026 06:59:40 -0700 Subject: [PATCH] Fix UNIQUE constraint failure on events.sequence during concurrent MCP tool calls When multiple MCP tool calls (e.g. dismiss, resolve) are invoked concurrently, the sequence counter could collide because emit() assigned the sequence and notified SSE subscribers before persisting to SQLite. This restructures the flow to persist first (with retry on UNIQUE violation), then notify subscribers, ensuring SSE clients always see the correct, persisted sequence number. Fixes #138 Co-Authored-By: Claude Opus 4.6 --- mcp/src/server/events.ts | 36 ++++++++++--- mcp/src/server/sqlite.ts | 114 ++++++++++++++++++++++++++++----------- 2 files changed, 113 insertions(+), 37 deletions(-) diff --git a/mcp/src/server/events.ts b/mcp/src/server/events.ts index f1b7034b..ed7d013b 100644 --- a/mcp/src/server/events.ts +++ b/mcp/src/server/events.ts @@ -47,6 +47,9 @@ class EventBus { /** * Emit an event to all subscribers. + * Assigns a sequence number and immediately notifies subscribers. + * For SQLite-backed stores, use nextSequence() + notify() instead + * to ensure the sequence is persisted before subscribers see it. */ emit( type: AFSEventType, @@ -61,6 +64,22 @@ class EventBus { payload, }; + this.notify(event); + return event; + } + + /** + * Get the next unique sequence number. + */ + nextSequence(): number { + return ++globalSequence; + } + + /** + * Notify all subscribers of an event. + * The event must already have a valid sequence assigned. + */ + notify(event: AFSEvent): void { // Notify global subscribers for (const handler of this.handlers) { try { @@ -71,7 +90,7 @@ class EventBus { } // Notify session-specific subscribers - const sessionHandlers = this.sessionHandlers.get(sessionId); + const sessionHandlers = this.sessionHandlers.get(event.sessionId); if (sessionHandlers) { for (const handler of sessionHandlers) { try { @@ -81,8 +100,6 @@ class EventBus { } } } - - return event; } /** @@ -188,6 +205,15 @@ class UserEventBus { payload, }; + this.notifyForUser(userId, event); + return event; + } + + /** + * Notify all subscribers for a specific user of an event. + * The event must already have a valid sequence assigned. + */ + notifyForUser(userId: string, event: AFSEvent): void { // Notify user-specific global subscribers const userHandlers = this.userHandlers.get(userId); if (userHandlers) { @@ -203,7 +229,7 @@ class UserEventBus { // Notify user-specific session subscribers const userSessions = this.userSessionHandlers.get(userId); if (userSessions) { - const sessionHandlers = userSessions.get(sessionId); + const sessionHandlers = userSessions.get(event.sessionId); if (sessionHandlers) { for (const handler of sessionHandlers) { try { @@ -214,8 +240,6 @@ class UserEventBus { } } } - - return event; } /** diff --git a/mcp/src/server/sqlite.ts b/mcp/src/server/sqlite.ts index c15ae5f1..4f9950ce 100644 --- a/mcp/src/server/sqlite.ts +++ b/mcp/src/server/sqlite.ts @@ -18,6 +18,7 @@ import type { Annotation, AnnotationStatus, ThreadMessage, + ActionRequest, Organization, User, UserRole, @@ -305,14 +306,43 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString(); stmts.pruneOldEvents.run(cutoff); - function persistEvent(event: AFSEvent): void { - stmts.insertEvent.run({ - type: event.type, - timestamp: event.timestamp, - sessionId: event.sessionId, - sequence: event.sequence, - payload: JSON.stringify(event.payload), - }); + /** + * Persist an event to SQLite and notify subscribers. + * Assigns a unique sequence number with retry on UNIQUE constraint violation. + * Persists BEFORE notifying so SSE subscribers see the correct sequence. + */ + function emitAndPersist( + type: AFSEventType, + sessionId: string, + payload: Annotation | Session | ThreadMessage | ActionRequest + ): void { + const MAX_RETRIES = 5; + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { + const event: AFSEvent = { + type, + timestamp: new Date().toISOString(), + sessionId, + sequence: eventBus.nextSequence(), + payload, + }; + try { + stmts.insertEvent.run({ + type: event.type, + timestamp: event.timestamp, + sessionId: event.sessionId, + sequence: event.sequence, + payload: JSON.stringify(event.payload), + }); + eventBus.notify(event); + return; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) { + continue; + } + throw err; + } + } } return { @@ -335,8 +365,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { metadata: null, }); - const event = eventBus.emit("session.created", session.id, session); - persistEvent(event); + emitAndPersist("session.created", session.id, session); return session; }, @@ -366,8 +395,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const session = this.getSession(id); if (session) { const eventType: AFSEventType = status === "closed" ? "session.closed" : "session.updated"; - const event = eventBus.emit(eventType, id, session); - persistEvent(event); + emitAndPersist(eventType, id, session); } return session; }, @@ -425,8 +453,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { authorId: annotation.authorId ?? null, }); - const event = eventBus.emit("annotation.created", sessionId, annotation); - persistEvent(event); + emitAndPersist("annotation.created", sessionId, annotation); return annotation; }, @@ -457,8 +484,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const updated = this.getAnnotation(id); if (updated && existing.sessionId) { - const event = eventBus.emit("annotation.updated", existing.sessionId, updated); - persistEvent(event); + emitAndPersist("annotation.updated", existing.sessionId, updated); } return updated; }, @@ -495,8 +521,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { const updated = this.updateAnnotation(annotationId, { thread }); if (updated && existing.sessionId) { - const event = eventBus.emit("thread.message", existing.sessionId, message); - persistEvent(event); + emitAndPersist("thread.message", existing.sessionId, message); } return updated; @@ -519,8 +544,7 @@ export function createSQLiteStore(dbPath?: string): AFSStore { stmts.deleteAnnotation.run(id); if (existing.sessionId) { - const event = eventBus.emit("annotation.deleted", existing.sessionId, existing); - persistEvent(event); + emitAndPersist("annotation.deleted", existing.sessionId, existing); } return existing; @@ -659,15 +683,44 @@ export function createTenantStore(dbPath?: string): TenantStore { const cutoff = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString(); tenantStmts.pruneOldEvents.run(cutoff); - function persistEventForUser(event: AFSEvent, userId: string): void { - tenantStmts.insertEvent.run({ - type: event.type, - timestamp: event.timestamp, - sessionId: event.sessionId, - sequence: event.sequence, - payload: JSON.stringify(event.payload), - userId, - }); + /** + * Persist a user-scoped event to SQLite and notify subscribers. + * Assigns a unique sequence number with retry on UNIQUE constraint violation. + */ + function emitAndPersistForUser( + userId: string, + type: AFSEventType, + sessionId: string, + payload: Annotation | Session | ThreadMessage | ActionRequest + ): void { + const MAX_RETRIES = 5; + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { + const event: AFSEvent = { + type, + timestamp: new Date().toISOString(), + sessionId, + sequence: eventBus.nextSequence(), + payload, + }; + try { + tenantStmts.insertEvent.run({ + type: event.type, + timestamp: event.timestamp, + sessionId: event.sessionId, + sequence: event.sequence, + payload: JSON.stringify(event.payload), + userId, + }); + eventBus.notify(event); + return; + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (message.includes("UNIQUE constraint failed: events.sequence") && attempt < MAX_RETRIES - 1) { + continue; + } + throw err; + } + } } return { @@ -801,8 +854,7 @@ export function createTenantStore(dbPath?: string): TenantStore { userId, }); - const event = eventBus.emit("session.created", session.id, session); - persistEventForUser(event, userId); + emitAndPersistForUser(userId, "session.created", session.id, session); return session; },