From 78b6f8ae2a4dcbb454b8fda6573372f8d6309161 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Wed, 4 Mar 2026 17:49:05 -0500 Subject: [PATCH 01/10] Add provider-agnostic channel bridge core runtime --- .../channels/core/channelBridgeWorker.test.ts | 1742 +++++++++++++++++ .../src/channels/core/channelBridgeWorker.ts | 858 ++++++++ docs/README.md | 1 + docs/channel-bridge.md | 57 + 4 files changed, 2658 insertions(+) create mode 100644 apps/cli/src/channels/core/channelBridgeWorker.test.ts create mode 100644 apps/cli/src/channels/core/channelBridgeWorker.ts create mode 100644 docs/channel-bridge.md diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts new file mode 100644 index 000000000..9b591fbaa --- /dev/null +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -0,0 +1,1742 @@ +import { describe, expect, it } from 'vitest'; + +import { + createInMemoryChannelBindingStore, + executeChannelBridgeTick, + startChannelBridgeWorker, + createChannelBridgeInboundDeduper, + type ChannelBridgeAdapter, + type ChannelBindingStore, + type ChannelBridgeDeps, + type ChannelBridgeInboundMessage, +} from './channelBridgeWorker'; + +interface SentConversationMessage { + conversationId: string; + threadId: string | null; + text: string; +} + +interface SentSessionMessage { + sessionId: string; + text: string; + sentFrom: string; + providerId: string; + conversationId: string; + threadId: string | null; +} + +interface WarningRecord { + message: string; + error?: unknown; +} + +interface DepsHarness { + deps: ChannelBridgeDeps; + sentToSession: SentSessionMessage[]; + warnings: WarningRecord[]; +} + +interface DeferredPromise { + promise: Promise; + resolve: (value: T | PromiseLike) => void; + reject: (reason?: unknown) => void; +} + +function createDepsHarness(options?: { + sessions?: Array<{ sessionId: string; label: string | null }>; + listSessions?: ChannelBridgeDeps['listSessions']; + resolveSessionIdOrPrefix?: ChannelBridgeDeps['resolveSessionIdOrPrefix']; + sendUserMessageToSession?: ChannelBridgeDeps['sendUserMessageToSession']; + resolveLatestSessionSeq?: ChannelBridgeDeps['resolveLatestSessionSeq']; + fetchAgentMessagesAfterSeq?: ChannelBridgeDeps['fetchAgentMessagesAfterSeq']; + authorizeCommand?: ChannelBridgeDeps['authorizeCommand']; +}): DepsHarness { + const sentToSession: SentSessionMessage[] = []; + const warnings: WarningRecord[] = []; + const deps: ChannelBridgeDeps = { + listSessions: options?.listSessions ?? (async () => options?.sessions ?? []), + resolveSessionIdOrPrefix: + options?.resolveSessionIdOrPrefix ?? + (async () => ({ ok: false as const, code: 'session_not_found' as const })), + sendUserMessageToSession: + options?.sendUserMessageToSession ?? + (async (params) => { + sentToSession.push({ ...params }); + }), + resolveLatestSessionSeq: options?.resolveLatestSessionSeq ?? (async () => 0), + fetchAgentMessagesAfterSeq: options?.fetchAgentMessagesAfterSeq ?? (async () => []), + authorizeCommand: options?.authorizeCommand, + onWarning: (message, error) => { + warnings.push({ message, error }); + }, + }; + return { deps, sentToSession, warnings }; +} + +function createAdapterHarness(providerId: string = 'telegram'): { + adapter: ChannelBridgeAdapter; + pushInbound: (event: ChannelBridgeInboundMessage) => void; + sent: SentConversationMessage[]; + failPullOnce: (error: Error) => void; + stopCalls: () => number; + pendingInboundCount: () => number; +} { + const queue: ChannelBridgeInboundMessage[] = []; + const sent: SentConversationMessage[] = []; + let pullError: Error | null = null; + let stopCallCount = 0; + + return { + adapter: { + providerId, + pullInboundMessages: async () => { + if (pullError) { + const error = pullError; + pullError = null; + throw error; + } + const items = queue.slice(); + queue.length = 0; + return items; + }, + sendMessage: async (params) => { + sent.push({ + conversationId: params.conversationId, + threadId: params.threadId, + text: params.text, + }); + }, + stop: async () => { + stopCallCount += 1; + }, + }, + pushInbound: (event) => { + queue.push(event); + }, + sent, + failPullOnce: (error) => { + pullError = error; + }, + stopCalls: () => stopCallCount, + pendingInboundCount: () => queue.length, + }; +} + +function createDeferredPromise(): DeferredPromise { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function waitFor(condition: () => boolean, timeoutMs: number = 2_000): Promise { + const startedAt = Date.now(); + while (!condition()) { + if (Date.now() - startedAt > timeoutMs) { + throw new Error('Timed out waiting for condition'); + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } +} + +describe('executeChannelBridgeTick', () => { + it('supports /attach then forwards inbound user messages into the bound session', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps, sentToSession } = createDepsHarness({ + resolveSessionIdOrPrefix: async (idOrPrefix: string) => { + if (idOrPrefix === 'abc123') { + return { ok: true as const, sessionId: 'sess-abc123' }; + } + return { ok: false as const, code: 'session_not_found' as const }; + }, + resolveLatestSessionSeq: async () => 41, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + text: '/attach abc123', + messageId: 'm1', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + const [binding] = await store.listBindings(); + expect(binding).toMatchObject({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + sessionId: 'sess-abc123', + lastForwardedSeq: 41, + }); + expect(harness.sent.some((row) => row.text.includes('Attached'))).toBe(true); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + text: 'Ship it', + messageId: 'm2', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(sentToSession).toEqual([ + { + sessionId: 'sess-abc123', + text: 'Ship it', + sentFrom: 'telegram', + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + }, + ]); + }); + + it('includes previous session id when /attach replaces an existing binding', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + sessionId: 'sess-old', + lastForwardedSeq: 12, + }); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-new' }), + resolveLatestSessionSeq: async () => 41, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '88', + text: '/attach sess-new', + messageId: 'm-attach-replace', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('replaced previous session sess-old'))).toBe(true); + }); + + it('supports /sessions and /detach command flow', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + sessionId: 'sess-old', + lastForwardedSeq: 3, + }); + + const { deps } = createDepsHarness({ + sessions: [ + { sessionId: 'sess-1', label: 'build-docs' }, + { sessionId: 'sess-2', label: null }, + ], + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/sessions', + messageId: 'm-sessions', + }); + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/detach', + messageId: 'm-detach', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Active sessions'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Detached'))).toBe(true); + + const remaining = await store.listBindings(); + expect(remaining).toHaveLength(0); + }); + + it('warns and replies when /sessions fails to list sessions', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps, warnings } = createDepsHarness({ + listSessions: async () => { + throw new Error('list unavailable'); + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/sessions', + messageId: 'm-sessions-fail', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('Failed to list sessions for /sessions command'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to retrieve sessions'))).toBe(true); + }); + + it('supports /session command for attached and non-attached conversations', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps } = createDepsHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + sessionId: 'sess-bound', + lastForwardedSeq: 3, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/session', + messageId: 'm-session-bound', + }); + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '100', + text: '/session', + messageId: 'm-session-unbound', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Attached session: sess-bound'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('No session is attached here'))).toBe(true); + }); + + it('warns and replies when /session cannot read binding from store', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + getBinding: async () => { + throw new Error('binding read failed'); + }, + }; + const harness = createAdapterHarness(); + const { deps, warnings } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/session', + messageId: 'm-session-store-fail', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('Failed to read binding for /session command'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to read current session binding'))).toBe(true); + }); + + it('warns and replies when /detach fails to remove a binding from store', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + removeBinding: async () => { + throw new Error('binding remove failed'); + }, + }; + const harness = createAdapterHarness(); + const { deps, warnings } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/detach', + messageId: 'm-detach-store-fail', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('Failed to remove binding for /detach command'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to detach current session binding'))).toBe(true); + }); + + it('supports /help and /start command aliases', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps, sentToSession } = createDepsHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + sessionId: 'sess-bound', + lastForwardedSeq: 3, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/help', + messageId: 'm-help', + }); + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/start', + messageId: 'm-start', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + const helpReplies = harness.sent.filter((row) => row.text.includes('Happier bridge commands:')); + expect(helpReplies).toHaveLength(2); + for (const reply of helpReplies) { + expect(reply.text.includes('/help - show command help')).toBe(true); + expect(reply.text.includes('/start - alias for /help')).toBe(true); + } + expect(sentToSession).toHaveLength(0); + }); + + it('replies for unknown slash commands instead of forwarding them', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps, sentToSession } = createDepsHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + sessionId: 'sess-bound', + lastForwardedSeq: 3, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/bogus-command', + messageId: 'm-unknown-command', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Unknown command: /bogus-command'))).toBe(true); + expect(sentToSession).toHaveLength(0); + }); + + it('replies for malformed slash command tokens instead of forwarding them', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps, sentToSession } = createDepsHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + sessionId: 'sess-bound', + lastForwardedSeq: 3, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/@', + messageId: 'm-malformed-command', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Unknown command. Use /help'))).toBe(true); + expect(sentToSession).toHaveLength(0); + }); + + it('replies with no-binding hint for non-command inbound text when conversation is unbound', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps, sentToSession } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'unbound-room', + threadId: null, + text: 'hello from unbound thread', + messageId: 'unbound-non-command', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('No session is attached here'))).toBe(true); + expect(sentToSession).toHaveLength(0); + }); + + it('warns and replies when non-command forwarding cannot read binding from store', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + getBinding: async () => { + throw new Error('binding read failed'); + }, + }; + const harness = createAdapterHarness(); + const { deps, warnings, sentToSession } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'bound-room', + threadId: null, + text: 'hello from channel', + messageId: 'non-command-getbinding-fail', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(sentToSession).toHaveLength(0); + expect(warnings.some((row) => row.message.includes('Failed to read binding for inbound message forwarding'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to read current session binding'))).toBe(true); + }); + + it('indicates when /sessions output is truncated', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps } = createDepsHarness({ + sessions: Array.from({ length: 21 }, (_, index) => ({ + sessionId: `sess-${index + 1}`, + label: null, + })), + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1001', + threadId: '99', + text: '/sessions', + messageId: 'm-sessions-truncated', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('…and 1 more.'))).toBe(true); + }); + + it('forwards agent replies to the bound conversation and advances cursor', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-1005', + threadId: null, + sessionId: 'sess-a', + lastForwardedSeq: 9, + }); + + const { deps } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async ({ afterSeq }: { afterSeq: number }) => { + if (afterSeq === 9) { + return [ + { seq: 10, text: 'First agent reply' }, + { seq: 11, text: 'Second agent reply' }, + ]; + } + return []; + }, + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent).toEqual([ + { conversationId: '-1005', threadId: null, text: 'First agent reply' }, + { conversationId: '-1005', threadId: null, text: 'Second agent reply' }, + ]); + + const [binding] = await store.listBindings(); + expect(binding?.lastForwardedSeq).toBe(11); + }); + + it('does not attach when latest session sequence cannot be resolved', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-invalid-seq' }), + resolveLatestSessionSeq: async () => { + throw new Error('sequence unavailable'); + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-invalid-seq', + messageId: 'attach-fails', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Failed to attach'))).toBe(true); + }); + + it('does not attach when session id/prefix resolution throws', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps, warnings } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => { + throw new Error('resolver unavailable'); + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-resolver-error', + messageId: 'attach-resolver-throws', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(warnings.some((row) => row.message.includes('Failed to resolve session by id/prefix for attach'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to attach to session sess-resolver-error'))).toBe(true); + }); + + it('does not attach when latest session sequence resolves to an invalid value', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-invalid-seq-value' }), + resolveLatestSessionSeq: async () => Number.NaN, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-invalid-seq-value', + messageId: 'attach-invalid-seq-value', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Failed to attach'))).toBe(true); + }); + + it('does not attach when binding persistence fails', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + upsertBinding: async () => { + throw new Error('binding upsert failed'); + }, + }; + const harness = createAdapterHarness(); + + const { deps, warnings } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-upsert-fail' }), + resolveLatestSessionSeq: async () => 10, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-upsert-fail', + messageId: 'attach-upsert-throws', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await baseStore.listBindings()).toHaveLength(0); + expect(warnings.some((row) => row.message.includes('Failed to persist binding during /attach'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('unable to persist binding'))).toBe(true); + }); + + it('returns ambiguous attach message even when resolver omits candidate ids', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ + ok: false as const, + code: 'session_id_ambiguous' as const, + candidates: [], + }), + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-ambiguous', + messageId: 'attach-ambiguous-empty-candidates', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Ambiguous session prefix'))).toBe(true); + }); + + it('returns unsupported attach message when resolver does not support attach by id/prefix', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: false as const, code: 'unsupported' as const }), + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-unsupported', + messageId: 'attach-unsupported', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Attaching by session ID or prefix is not supported'))).toBe(true); + }); + + it('returns session-not-found attach message when resolver cannot find target', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + resolveSessionIdOrPrefix: async () => ({ ok: false as const, code: 'session_not_found' as const }), + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach sess-missing', + messageId: 'attach-session-not-found', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Session not found'))).toBe(true); + }); + + it('replies with usage hint when /attach is called without arguments', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1007', + threadId: null, + text: '/attach', + messageId: 'attach-no-args', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Usage: /attach'))).toBe(true); + }); + + it('enforces sender-scoped command authorization via deps hook', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps } = createDepsHarness({ + authorizeCommand: async ({ commandName, actor }) => { + if (commandName === 'attach' && actor.senderId === 'blocked-user') { + return { allowed: false as const, message: 'Not authorized for attach.' }; + } + return true; + }, + resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-new' }), + resolveLatestSessionSeq: async () => 5, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1008', + threadId: null, + senderId: 'blocked-user', + text: '/attach sess-new', + messageId: 'attach-authz-blocked', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(await store.listBindings()).toHaveLength(0); + expect(harness.sent.some((row) => row.text.includes('Not authorized for attach.'))).toBe(true); + }); + + it('denies command and warns when authorizeCommand hook throws', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + const { deps, warnings } = createDepsHarness({ + authorizeCommand: async () => { + throw new Error('auth service unavailable'); + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-1009', + threadId: null, + text: '/sessions', + messageId: 'authz-throws', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Unable to authorize this command right now.'))).toBe(true); + expect(warnings.some((row) => row.message.includes('Authorization check failed'))).toBe(true); + }); + + it('normalizes inbound provider identity to adapter provider and warns on mismatch', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness('telegram'); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'chat-42', + threadId: null, + sessionId: 'sess-42', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession, warnings } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'discord', + conversationId: 'chat-42', + threadId: null, + text: 'Hello from spoofed provider', + messageId: 'm-spoof', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(sentToSession).toHaveLength(1); + expect(sentToSession[0]).toMatchObject({ + providerId: 'telegram', + sentFrom: 'telegram', + conversationId: 'chat-42', + threadId: null, + }); + expect(warnings.some((row) => row.message.includes('Inbound provider mismatch'))).toBe(true); + }); + + it('continues processing other adapters when one adapter pull fails', async () => { + const store = createInMemoryChannelBindingStore(); + const failing = createAdapterHarness('telegram'); + const healthy = createAdapterHarness('discord'); + + await store.upsertBinding({ + providerId: 'discord', + conversationId: 'discord-room', + threadId: null, + sessionId: 'sess-discord', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession, warnings } = createDepsHarness(); + + failing.failPullOnce(new Error('telegram pull failed')); + healthy.pushInbound({ + providerId: 'discord', + conversationId: 'discord-room', + threadId: null, + text: 'still processed', + messageId: 'discord-message', + }); + + await executeChannelBridgeTick({ + store, + adapters: [failing.adapter, healthy.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(sentToSession).toEqual([ + { + sessionId: 'sess-discord', + text: 'still processed', + sentFrom: 'discord', + providerId: 'discord', + conversationId: 'discord-room', + threadId: null, + }, + ]); + expect(warnings.some((row) => row.message.includes('Failed to pull inbound messages for adapter telegram'))).toBe(true); + }); + + it('warns and ignores duplicate adapter provider ids', async () => { + const store = createInMemoryChannelBindingStore(); + const first = createAdapterHarness('telegram'); + const second = createAdapterHarness('telegram'); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'room-1', + threadId: null, + sessionId: 'sess-dup', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession, warnings } = createDepsHarness(); + + first.pushInbound({ + providerId: 'telegram', + conversationId: 'room-1', + threadId: null, + text: 'from first adapter', + messageId: 'dup-1', + }); + second.pushInbound({ + providerId: 'telegram', + conversationId: 'room-1', + threadId: null, + text: 'from second adapter', + messageId: 'dup-2', + }); + + await executeChannelBridgeTick({ + store, + adapters: [first.adapter, second.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(sentToSession).toHaveLength(1); + expect(sentToSession[0]?.text).toBe('from first adapter'); + expect(warnings.some((row) => row.message.includes('Duplicate adapter providerId detected: telegram'))).toBe(true); + }); + + it('warns and replies when forwarding inbound text into session fails', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'failing-room', + threadId: null, + sessionId: 'sess-fail', + lastForwardedSeq: 0, + }); + + const { deps, warnings } = createDepsHarness({ + sendUserMessageToSession: async () => { + throw new Error('session unavailable'); + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'failing-room', + threadId: null, + text: 'hello', + messageId: 'send-fail-msg', + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('Failed to forward channel message into session'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('Failed to send message to session sess-fail.'))).toBe(true); + }); + + it('persists cursor after successful sends when a later outbound row fails', async () => { + const store = createInMemoryChannelBindingStore(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-2002', + threadId: null, + sessionId: 'sess-partial', + lastForwardedSeq: 9, + }); + + let sendCalls = 0; + const adapter: ChannelBridgeAdapter = { + providerId: 'telegram', + pullInboundMessages: async () => [], + sendMessage: async () => { + sendCalls += 1; + if (sendCalls >= 2) { + throw new Error('simulated send failure'); + } + }, + stop: async () => {}, + }; + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [ + { seq: 10, text: 'first row' }, + { seq: 11, text: 'second row fails' }, + ], + }); + + await executeChannelBridgeTick({ + store, + adapters: [adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + const [binding] = await store.listBindings(); + expect(binding?.lastForwardedSeq).toBe(10); + expect(warnings.some((row) => row.message.includes('Failed to forward agent output to channel'))).toBe(true); + }); + + it('warns when fetching outbound agent rows fails', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-2003', + threadId: null, + sessionId: 'sess-fetch-fail', + lastForwardedSeq: 3, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => { + throw new Error('transcript read failed'); + }, + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent).toHaveLength(0); + expect(warnings.some((row) => row.message.includes('Failed to forward agent output to channel'))).toBe(true); + }); + + it('deduplicates repeated inbound messages across direct executeChannelBridgeTick calls', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'direct-dedupe-room', + threadId: null, + sessionId: 'sess-direct-dedupe', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession } = createDepsHarness(); + const repeated = { + providerId: 'telegram' as const, + conversationId: 'direct-dedupe-room', + threadId: null, + text: 'same payload', + messageId: 'direct-dedupe-id-1', + }; + + const deduper = createChannelBridgeInboundDeduper(); + + harness.pushInbound(repeated); + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: deduper, + }); + + harness.pushInbound(repeated); + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: deduper, + }); + + expect(sentToSession).toHaveLength(1); + }); + + it('does not dedupe messages when inbound messageId is empty', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'empty-id-room', + threadId: null, + sessionId: 'sess-empty-id', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession } = createDepsHarness(); + const messageWithEmptyId = { + providerId: 'telegram' as const, + conversationId: 'empty-id-room', + threadId: null, + text: 'same payload', + messageId: ' ', + }; + + const deduper = createChannelBridgeInboundDeduper(); + + harness.pushInbound(messageWithEmptyId); + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: deduper, + }); + + harness.pushInbound(messageWithEmptyId); + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: deduper, + }); + + expect(sentToSession).toHaveLength(2); + }); + + it('skips invalid outbound seq rows and warns without stalling valid cursor updates', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-3001', + threadId: null, + sessionId: 'sess-invalid-seq', + lastForwardedSeq: 9, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [ + { seq: Number.NaN, text: 'invalid row' }, + { seq: 10, text: 'valid row' }, + ], + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + const [binding] = await store.listBindings(); + expect(binding?.lastForwardedSeq).toBe(10); + expect(harness.sent.some((row) => row.text.includes('valid row'))).toBe(true); + expect(harness.sent.some((row) => row.text.includes('invalid row'))).toBe(false); + expect(warnings.some((row) => row.message.includes('Skipped agent output row with invalid seq'))).toBe(true); + }); + + it('forwards outbound agent rows in ascending seq order even when source rows are unsorted', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-3002', + threadId: null, + sessionId: 'sess-unsorted-seq', + lastForwardedSeq: 9, + }); + + const { deps } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [ + { seq: 12, text: 'third by seq' }, + { seq: 10, text: 'first by seq' }, + { seq: 11, text: 'second by seq' }, + ], + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.map((row) => row.text)).toEqual([ + 'first by seq', + 'second by seq', + 'third by seq', + ]); + + const [binding] = await store.listBindings(); + expect(binding?.lastForwardedSeq).toBe(12); + }); + + it('skips outbound rows at or below the current cursor', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: '-3003', + threadId: null, + sessionId: 'sess-cursor-guard', + lastForwardedSeq: 10, + }); + + const { deps } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [ + { seq: 9, text: 'already forwarded before cursor' }, + { seq: 10, text: 'exactly at cursor' }, + { seq: 11, text: 'new row after cursor' }, + ], + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.map((row) => row.text)).toEqual(['new row after cursor']); + + const [binding] = await store.listBindings(); + expect(binding?.lastForwardedSeq).toBe(11); + }); + + it('warns and stops forwarding remaining rows when cursor persistence fails after send', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + updateLastForwardedSeq: async () => { + throw new Error('cursor store unavailable'); + }, + }; + const harness = createAdapterHarness(); + + await baseStore.upsertBinding({ + providerId: 'telegram', + conversationId: '-3004', + threadId: null, + sessionId: 'sess-cursor-persist-fail', + lastForwardedSeq: 9, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [ + { seq: 10, text: 'first send succeeds' }, + { seq: 11, text: 'should not send after cursor failure' }, + ], + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.map((row) => row.text)).toEqual(['first send succeeds']); + expect(warnings.some((row) => row.message.includes('Failed to persist channel bridge cursor'))).toBe(true); + expect(warnings.some((row) => row.message.includes('Failed to forward agent output to channel'))).toBe(false); + + const [binding] = await baseStore.listBindings(); + expect(binding?.lastForwardedSeq).toBe(9); + }); + + it('warns when binding provider has no active adapter for outbound forwarding', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness('discord'); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'orphaned-room', + threadId: null, + sessionId: 'sess-orphaned', + lastForwardedSeq: 0, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [{ seq: 1, text: 'ignored' }], + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('No adapter registered for binding providerId=telegram'))).toBe(true); + }); + + it('warns once per binding when missing adapter warnings are tracked across ticks', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness('discord'); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'orphaned-room', + threadId: null, + sessionId: 'sess-orphaned', + lastForwardedSeq: 0, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [{ seq: 1, text: 'ignored' }], + }); + const warnedMissingAdapterBindings = new Set(); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + warnedMissingAdapterBindings, + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + warnedMissingAdapterBindings, + }); + + const missingAdapterWarnings = warnings.filter((row) => + row.message.includes('No adapter registered for binding providerId=telegram'), + ); + expect(missingAdapterWarnings).toHaveLength(1); + }); + + it('warns and exits tick when listing bindings fails', async () => { + const baseStore = createInMemoryChannelBindingStore(); + const store: ChannelBindingStore = { + ...baseStore, + listBindings: async () => { + throw new Error('list bindings unavailable'); + }, + }; + const harness = createAdapterHarness(); + const { deps, warnings } = createDepsHarness(); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(warnings.some((row) => row.message.includes('Failed to list bindings for outbound forwarding'))).toBe(true); + }); +}); + +describe('startChannelBridgeWorker', () => { + it('runs the first tick on startup', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps } = createDepsHarness({ + sessions: [{ sessionId: 'sess-1', label: 'demo' }], + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: '-2001', + threadId: null, + text: '/sessions', + messageId: 'startup-sessions', + }); + + const worker = startChannelBridgeWorker({ + store, + adapters: [harness.adapter], + deps, + tickMs: 60_000, + }); + + try { + await waitFor(() => harness.sent.length > 0); + expect(harness.sent.some((row) => row.text.includes('Active sessions'))).toBe(true); + } finally { + await worker.stop(); + } + }); + + it('deduplicates inbound messages across runtime ticks', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'dedupe-room', + threadId: null, + sessionId: 'sess-dedupe', + lastForwardedSeq: 0, + }); + + const { deps, sentToSession } = createDepsHarness(); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'dedupe-room', + threadId: null, + text: 'duplicate payload', + messageId: 'dedupe-id-1', + }); + + const worker = startChannelBridgeWorker({ + store, + adapters: [harness.adapter], + deps, + tickMs: 60_000, + }); + + try { + await waitFor(() => harness.pendingInboundCount() === 0 && sentToSession.length === 1); + expect(sentToSession).toHaveLength(1); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'dedupe-room', + threadId: null, + text: 'duplicate payload', + messageId: 'dedupe-id-1', + }); + worker.trigger(); + + await waitFor(() => harness.pendingInboundCount() === 0 && sentToSession.length === 1); + expect(sentToSession).toHaveLength(1); + } finally { + await worker.stop(); + } + }); + + it('stops idempotently and waits for in-flight tick before stopping adapters', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + let startedTick = false; + + await store.upsertBinding({ + providerId: 'telegram', + conversationId: 'stop-room', + threadId: null, + sessionId: 'sess-stop', + lastForwardedSeq: 0, + }); + + const gate = createDeferredPromise(); + const { deps } = createDepsHarness({ + sendUserMessageToSession: async () => { + startedTick = true; + await gate.promise; + }, + }); + + harness.pushInbound({ + providerId: 'telegram', + conversationId: 'stop-room', + threadId: null, + text: 'block until released', + messageId: 'stop-message', + }); + + const worker = startChannelBridgeWorker({ + store, + adapters: [harness.adapter], + deps, + tickMs: 60_000, + }); + + try { + await waitFor(() => startedTick); + + const stopFirst = worker.stop(); + const stopSecond = worker.stop(); + expect(harness.stopCalls()).toBe(0); + + gate.resolve(); + await stopFirst; + await stopSecond; + + expect(harness.stopCalls()).toBe(1); + } finally { + gate.resolve(); + await worker.stop(); + } + }); + + it('continues stopping remaining adapters if one stop fails', async () => { + const store = createInMemoryChannelBindingStore(); + let secondaryStopped = false; + const adapters: ChannelBridgeAdapter[] = [ + { + providerId: 'telegram', + pullInboundMessages: async () => [], + sendMessage: async () => {}, + stop: async () => { + throw new Error('primary stop failed'); + }, + }, + { + providerId: 'discord', + pullInboundMessages: async () => [], + sendMessage: async () => {}, + stop: async () => { + secondaryStopped = true; + }, + }, + ]; + + const { deps, warnings } = createDepsHarness(); + const worker = startChannelBridgeWorker({ + store, + adapters, + deps, + tickMs: 60_000, + }); + + try { + await worker.stop(); + + expect(secondaryStopped).toBe(true); + expect(warnings.some((row) => row.message.includes('Failed to stop channel adapter telegram'))).toBe(true); + } finally { + await worker.stop(); + } + }); + + it('stops distinct adapter instances even when provider ids are duplicated', async () => { + const store = createInMemoryChannelBindingStore(); + const firstStop = { calls: 0 }; + const duplicateStop = { calls: 0 }; + + const adapters: ChannelBridgeAdapter[] = [ + { + providerId: 'telegram', + pullInboundMessages: async () => [], + sendMessage: async () => {}, + stop: async () => { + firstStop.calls += 1; + }, + }, + { + providerId: 'telegram', + pullInboundMessages: async () => [], + sendMessage: async () => {}, + stop: async () => { + duplicateStop.calls += 1; + }, + }, + ]; + + const { deps } = createDepsHarness(); + const worker = startChannelBridgeWorker({ + store, + adapters, + deps, + tickMs: 60_000, + }); + + try { + await worker.stop(); + expect(firstStop.calls).toBe(1); + expect(duplicateStop.calls).toBe(1); + } finally { + await worker.stop(); + } + }); + + it('stops shared adapter references once during shutdown', async () => { + const store = createInMemoryChannelBindingStore(); + const stopCounter = { calls: 0 }; + + const sharedAdapter: ChannelBridgeAdapter = { + providerId: 'telegram', + pullInboundMessages: async () => [], + sendMessage: async () => {}, + stop: async () => { + stopCounter.calls += 1; + }, + }; + + const { deps } = createDepsHarness(); + const worker = startChannelBridgeWorker({ + store, + adapters: [sharedAdapter, sharedAdapter], + deps, + tickMs: 60_000, + }); + + try { + await worker.stop(); + expect(stopCounter.calls).toBe(1); + } finally { + await worker.stop(); + } + }); +}); diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts new file mode 100644 index 000000000..e4578bcfa --- /dev/null +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -0,0 +1,858 @@ +/** + * Provider-agnostic channel bridge worker. + * + * Responsibilities: + * - Pull inbound channel messages from adapters + * - Handle slash commands (`/sessions`, `/attach`, `/detach`, `/session`, `/help`) + * - Forward non-command inbound messages into attached Happier sessions + * - Forward agent output back into the mapped channel conversation + * + * Cursor semantics: + * - `lastForwardedSeq` tracks the highest transcript sequence that has been delivered + * to the channel for a given binding. + * - `fetchAgentMessagesAfterSeq` is treated as an exclusive cursor (`seq > afterSeq`). + * - `updateLastForwardedSeq` must persist the maximum forwarded sequence. + */ +import { startSingleFlightIntervalLoop, type SingleFlightIntervalLoopHandle } from '@/daemon/lifecycle/singleFlightIntervalLoop'; + +/** + * Logical channel conversation reference. + * + * For thread-capable providers, `threadId` identifies the topic/thread; for non-threaded + * conversations it is `null`. + */ +export type ChannelBridgeConversationRef = Readonly<{ + providerId: string; + conversationId: string; + threadId: string | null; +}>; + +/** + * Inbound message event produced by an adapter. + * + * `messageId` is used for best-effort duplicate suppression in the worker runtime. + */ +export type ChannelBridgeInboundMessage = ChannelBridgeConversationRef & Readonly<{ + senderId?: string | null; + text: string; + messageId: string; +}>; + +export type ChannelBridgeActorContext = Readonly<{ + providerId: string; + conversationId: string; + threadId: string | null; + senderId: string | null; +}>; + +/** + * Adapter contract for a specific provider (Telegram, Discord, etc.). + * + * Expectations: + * - `pullInboundMessages` should return available inbound items without throwing for + * normal empty states (return `[]` instead). + * - `sendMessage` should deliver text into a target conversation/thread. + * - `stop` is optional and should tear down adapter resources. + */ +export type ChannelBridgeAdapter = Readonly<{ + providerId: string; + pullInboundMessages: () => Promise; + sendMessage: (params: Readonly<{ conversationId: string; threadId: string | null; text: string }>) => Promise; + stop?: () => void | Promise; +}>; + +/** + * Persisted conversation -> session mapping and agent cursor state. + */ +export type ChannelSessionBinding = ChannelBridgeConversationRef & Readonly<{ + sessionId: string; + lastForwardedSeq: number; + createdAtMs: number; + updatedAtMs: number; +}>; + +/** + * Resolution result for `/attach `. + */ +export type ResolveSessionIdResult = + | Readonly<{ ok: true; sessionId: string }> + | Readonly<{ ok: false; code: 'session_not_found' | 'session_id_ambiguous' | 'unsupported'; candidates?: string[] }>; + +/** + * Bridge dependencies supplied by runtime integration. + * + * - `resolveLatestSessionSeq` should return the latest valid non-negative transcript cursor. + * - `fetchAgentMessagesAfterSeq` should return rows with `seq > afterSeq`. + * Results may be unsorted; the worker enforces ascending `seq` delivery before forwarding. + * - `onWarning` receives non-fatal operational issues; worker continues best-effort. + */ +export type ChannelBridgeDeps = Readonly<{ + listSessions: () => Promise>>; + resolveSessionIdOrPrefix: (idOrPrefix: string) => Promise; + sendUserMessageToSession: (params: Readonly<{ + sessionId: string; + text: string; + sentFrom: string; + providerId: string; + conversationId: string; + threadId: string | null; + }>) => Promise; + resolveLatestSessionSeq: (sessionId: string) => Promise; + fetchAgentMessagesAfterSeq: (params: Readonly<{ sessionId: string; afterSeq: number }>) => Promise>>; + authorizeCommand?: (params: Readonly<{ commandName: string; actor: ChannelBridgeActorContext }>) => Promise>; + onWarning?: (message: string, error?: unknown) => void; +}>; + +/** + * Binding persistence contract used by the bridge worker. + * + * `updateLastForwardedSeq` is monotonic: implementations should keep the highest cursor. + */ +export type ChannelBindingStore = Readonly<{ + listBindings: () => Promise; + getBinding: (ref: ChannelBridgeConversationRef) => Promise; + upsertBinding: (binding: Readonly<{ + providerId: string; + conversationId: string; + threadId: string | null; + sessionId: string; + lastForwardedSeq: number; + }>) => Promise; + updateLastForwardedSeq: (ref: ChannelBridgeConversationRef, seq: number) => Promise; + removeBinding: (ref: ChannelBridgeConversationRef) => Promise; +}>; + +export type ChannelBridgeWorkerHandle = Readonly<{ + /** Stops the worker. Idempotent; safe to call multiple times. */ + stop: () => Promise; + /** Requests an immediate tick; no-op once `stop()` has been called. */ + trigger: () => void; +}>; + +/** + * Key encoding for in-memory binding map. + * + * Uses JSON array encoding to avoid delimiter collision risks. + */ +function bindingKey(ref: ChannelBridgeConversationRef): string { + return JSON.stringify([ref.providerId, ref.conversationId, ref.threadId]); +} + +function toNonNegativeInt(value: unknown): number | null { + if (typeof value !== 'number' || !Number.isFinite(value)) return null; + const parsed = Math.trunc(value); + if (parsed < 0) return null; + return parsed; +} + +type ChannelBridgeInboundDeduper = Readonly<{ + isDuplicate: (message: ChannelBridgeInboundMessage) => boolean; +}>; + +/** + * Create an inbound deduper for channel messages. + * + * Use this to isolate dedupe state across independent bridge instances sharing the same process. + */ +export function createChannelBridgeInboundDeduper(now: () => number = () => Date.now()): ChannelBridgeInboundDeduper { + const recent = new Map(); + const ttlMs = 5 * 60 * 1000; + const maxEntries = 20_000; + const minPruneIntervalMs = 1_000; + let lastPrunedAtMs = 0; + + const prune = (currentNow: number) => { + if (recent.size <= maxEntries && currentNow - lastPrunedAtMs < minPruneIntervalMs) { + return; + } + lastPrunedAtMs = currentNow; + + for (const [key, seenAtMs] of recent) { + if (currentNow - seenAtMs > ttlMs) { + recent.delete(key); + } + } + while (recent.size > maxEntries) { + const [oldest] = recent.keys(); + if (oldest === undefined) break; + recent.delete(oldest); + } + }; + + return { + isDuplicate: (message) => { + const normalizedMessageId = String(message.messageId).trim(); + if (normalizedMessageId.length === 0) { + return false; + } + + const key = JSON.stringify([message.providerId, message.conversationId, message.threadId, normalizedMessageId]); + const currentNow = now(); + prune(currentNow); + if (recent.has(key)) return true; + recent.set(key, currentNow); + return false; + }, + }; +} + +/** + * Create an in-memory binding store. + * + * `now` is injectable for deterministic tests. + */ +export function createInMemoryChannelBindingStore(now: () => number = () => Date.now()): ChannelBindingStore { + const byKey = new Map(); + + return { + listBindings: async () => Array.from(byKey.values()), + getBinding: async (ref) => byKey.get(bindingKey(ref)) ?? null, + upsertBinding: async (binding) => { + const key = bindingKey(binding); + const existing = byKey.get(key); + const next: ChannelSessionBinding = { + providerId: binding.providerId, + conversationId: binding.conversationId, + threadId: binding.threadId, + sessionId: binding.sessionId, + lastForwardedSeq: Math.max(0, Math.trunc(binding.lastForwardedSeq)), + createdAtMs: existing?.createdAtMs ?? now(), + updatedAtMs: now(), + }; + byKey.set(key, next); + return next; + }, + updateLastForwardedSeq: async (ref, seq) => { + const key = bindingKey(ref); + const existing = byKey.get(key); + if (!existing) return; + const nextSeq = Math.max(existing.lastForwardedSeq, Math.max(0, Math.trunc(seq))); + byKey.set(key, { + ...existing, + lastForwardedSeq: nextSeq, + updatedAtMs: now(), + }); + }, + removeBinding: async (ref) => byKey.delete(bindingKey(ref)), + }; +} + +function parseSlashCommand(text: string): Readonly<{ name: string; args: string[] }> | null { + const trimmed = text.trim(); + if (!trimmed.startsWith('/')) return null; + const [rawName, ...args] = trimmed.slice(1).split(/\s+/g); + const normalized = String(rawName ?? '').trim().toLowerCase(); + if (!normalized) return null; + const name = normalized.split('@')[0]!.trim(); + if (!name) return null; + return { name, args }; +} + +function formatSessionsMessage(rows: Array>): string { + if (rows.length === 0) { + return 'No active sessions found.'; + } + const limit = 20; + const truncated = rows.length > limit; + const body = rows + .slice(0, limit) + .map((row) => `• ${row.sessionId}${row.label ? ` (${row.label})` : ''}`) + .join('\n'); + const suffix = truncated ? `\n…and ${rows.length - limit} more.` : ''; + return `Active sessions:\n${body}${suffix}`; +} + +async function replyToConversation( + adapter: ChannelBridgeAdapter, + conversation: Readonly<{ conversationId: string; threadId: string | null }>, + text: string, +): Promise { + await adapter.sendMessage({ + conversationId: conversation.conversationId, + threadId: conversation.threadId, + text, + }); +} + +async function authorizeCommand(params: Readonly<{ + commandName: string; + event: ChannelBridgeInboundMessage; + deps: ChannelBridgeDeps; +}>): Promise> { + const authorize = params.deps.authorizeCommand; + if (!authorize) { + return { allowed: true, message: null }; + } + + const senderRaw = params.event.senderId; + const senderId = typeof senderRaw === 'string' && senderRaw.trim().length > 0 ? senderRaw.trim() : null; + const actor: ChannelBridgeActorContext = { + providerId: params.event.providerId, + conversationId: params.event.conversationId, + threadId: params.event.threadId, + senderId, + }; + + try { + const result = await authorize({ + commandName: params.commandName, + actor, + }); + if (typeof result === 'boolean') { + return { + allowed: result, + message: null, + }; + } + + const allowed = Boolean(result.allowed); + const message = typeof result.message === 'string' && result.message.trim().length > 0 ? result.message.trim() : null; + return { allowed, message }; + } catch (error) { + params.deps.onWarning?.(`Authorization check failed for command /${params.commandName}`, error); + return { + allowed: false, + message: 'Unable to authorize this command right now.', + }; + } +} + +async function handleCommand(params: Readonly<{ + command: Readonly<{ name: string; args: string[] }>; + event: ChannelBridgeInboundMessage; + adapter: ChannelBridgeAdapter; + store: ChannelBindingStore; + deps: ChannelBridgeDeps; +}>): Promise { + const { command, event, adapter, store, deps } = params; + const ref: ChannelBridgeConversationRef = { + providerId: event.providerId, + conversationId: event.conversationId, + threadId: event.threadId, + }; + + if (command.name !== 'help' && command.name !== 'start') { + const authz = await authorizeCommand({ + commandName: command.name, + event, + deps, + }); + if (!authz.allowed) { + await replyToConversation( + adapter, + ref, + authz.message ?? 'You are not authorized to run this command here.', + ); + return true; + } + } + + if (command.name === 'help' || command.name === 'start') { + await replyToConversation( + adapter, + ref, + [ + 'Happier bridge commands:', + '/sessions - list active sessions', + '/attach - bind this DM/topic', + '/detach - unbind this DM/topic', + '/session - show current binding', + '/help - show command help', + '/start - alias for /help', + ].join('\n'), + ); + return true; + } + + if (command.name === 'sessions') { + let sessions: Array>; + try { + sessions = await deps.listSessions(); + } catch (error) { + deps.onWarning?.('Failed to list sessions for /sessions command', error); + await replyToConversation(adapter, ref, 'Failed to retrieve sessions. Please try again later.'); + return true; + } + await replyToConversation(adapter, ref, formatSessionsMessage(sessions)); + return true; + } + + if (command.name === 'session') { + let existing: ChannelSessionBinding | null; + try { + existing = await store.getBinding(ref); + } catch (error) { + deps.onWarning?.('Failed to read binding for /session command', error); + await replyToConversation(adapter, ref, 'Failed to read current session binding. Please try again later.'); + return true; + } + + if (!existing) { + await replyToConversation(adapter, ref, 'No session is attached here. Use /attach .'); + return true; + } + await replyToConversation(adapter, ref, `Attached session: ${existing.sessionId}`); + return true; + } + + if (command.name === 'attach') { + const idOrPrefix = String(command.args[0] ?? '').trim(); + if (!idOrPrefix) { + await replyToConversation(adapter, ref, 'Usage: /attach '); + return true; + } + + let resolved: ResolveSessionIdResult; + try { + resolved = await deps.resolveSessionIdOrPrefix(idOrPrefix); + } catch (error) { + deps.onWarning?.('Failed to resolve session by id/prefix for attach', error); + await replyToConversation( + adapter, + ref, + `Failed to attach to session ${idOrPrefix}: unable to resolve session identifier.`, + ); + return true; + } + + if (!resolved.ok) { + if (resolved.code === 'session_id_ambiguous') { + if (resolved.candidates && resolved.candidates.length > 0) { + await replyToConversation( + adapter, + ref, + `Ambiguous session prefix. Candidates:\n${resolved.candidates.map((id) => `• ${id}`).join('\n')}`, + ); + return true; + } + + await replyToConversation(adapter, ref, 'Ambiguous session prefix. Use /sessions to list active sessions.'); + return true; + } + + if (resolved.code === 'unsupported') { + await replyToConversation(adapter, ref, 'Attaching by session ID or prefix is not supported in this environment.'); + return true; + } + + await replyToConversation(adapter, ref, 'Session not found. Use /sessions to list active sessions.'); + return true; + } + + let latestSeq: number; + try { + const resolvedSeq = toNonNegativeInt(await deps.resolveLatestSessionSeq(resolved.sessionId)); + if (resolvedSeq === null) { + await replyToConversation( + adapter, + ref, + `Failed to attach to session ${resolved.sessionId}: unable to resolve latest sequence cursor.`, + ); + return true; + } + latestSeq = resolvedSeq; + } catch (error) { + deps.onWarning?.('Failed to resolve latest session sequence for attach', error); + await replyToConversation( + adapter, + ref, + `Failed to attach to session ${resolved.sessionId}: unable to resolve latest sequence cursor.`, + ); + return true; + } + + let previousBinding: ChannelSessionBinding | null; + try { + previousBinding = await store.getBinding(ref); + } catch (error) { + deps.onWarning?.('Failed to read existing binding during /attach', error); + await replyToConversation(adapter, ref, 'Failed to read current binding before attach. Please try again later.'); + return true; + } + const previousSessionId = previousBinding?.sessionId ?? null; + + try { + await store.upsertBinding({ + providerId: ref.providerId, + conversationId: ref.conversationId, + threadId: ref.threadId, + sessionId: resolved.sessionId, + lastForwardedSeq: latestSeq, + }); + } catch (error) { + deps.onWarning?.('Failed to persist binding during /attach', error); + await replyToConversation(adapter, ref, `Failed to attach to session ${resolved.sessionId}: unable to persist binding.`); + return true; + } + + const switchedFrom = + previousSessionId && previousSessionId !== resolved.sessionId + ? ` (replaced previous session ${previousSessionId})` + : ''; + await replyToConversation(adapter, ref, `Attached this conversation to session ${resolved.sessionId}${switchedFrom}.`); + return true; + } + + if (command.name === 'detach') { + let removed = false; + try { + removed = await store.removeBinding(ref); + } catch (error) { + deps.onWarning?.('Failed to remove binding for /detach command', error); + await replyToConversation(adapter, ref, 'Failed to detach current session binding. Please try again later.'); + return true; + } + + if (removed) { + await replyToConversation(adapter, ref, 'Detached this conversation from Happier session.'); + } else { + await replyToConversation(adapter, ref, 'No session was attached here.'); + } + return true; + } + + await replyToConversation(adapter, ref, `Unknown command: /${command.name}. Use /help for supported commands.`); + return true; +} + +/** + * Execute one bridge tick. + * + * Flow: + * 1) Pull inbound messages per adapter + * 2) Handle commands or forward user text to attached session + * 3) Fetch agent output after each binding cursor and send to channel + * 4) Advance cursors monotonically + * + * Deduper behavior: + * - `inboundDeduper` is required to make dedupe-state ownership explicit. + * - Use `createChannelBridgeInboundDeduper()` to construct per-worker dedupe state. + */ +export async function executeChannelBridgeTick(params: Readonly<{ + store: ChannelBindingStore; + adapters: readonly ChannelBridgeAdapter[]; + deps: ChannelBridgeDeps; + inboundDeduper: Readonly<{ + isDuplicate: (message: ChannelBridgeInboundMessage) => boolean; + }>; + warnedMissingAdapterBindings?: Set; +}>): Promise { + const activeAdapters: ChannelBridgeAdapter[] = []; + const adapterByProvider = new Map(); + for (const adapter of params.adapters) { + if (adapterByProvider.has(adapter.providerId)) { + params.deps.onWarning?.(`Duplicate adapter providerId detected: ${adapter.providerId}; ignoring later adapter instance.`); + continue; + } + adapterByProvider.set(adapter.providerId, adapter); + activeAdapters.push(adapter); + } + + const deduper = params.inboundDeduper; + + for (const adapter of activeAdapters) { + let inbound: ChannelBridgeInboundMessage[]; + try { + inbound = await adapter.pullInboundMessages(); + } catch (error) { + params.deps.onWarning?.(`Failed to pull inbound messages for adapter ${adapter.providerId}`, error); + continue; + } + + for (const rawEvent of inbound) { + const event: ChannelBridgeInboundMessage = + rawEvent.providerId === adapter.providerId + ? rawEvent + : { + ...rawEvent, + providerId: adapter.providerId, + }; + + if (rawEvent.providerId !== adapter.providerId) { + params.deps.onWarning?.( + `Inbound provider mismatch; using adapter providerId=${adapter.providerId} instead of event providerId=${rawEvent.providerId}`, + ); + } + + if (deduper.isDuplicate(event)) { + continue; + } + + try { + const command = parseSlashCommand(event.text); + if (command) { + await handleCommand({ + command, + event, + adapter, + store: params.store, + deps: params.deps, + }); + continue; + } + + if (event.text.trim().startsWith('/')) { + await replyToConversation(adapter, { + conversationId: event.conversationId, + threadId: event.threadId, + }, 'Unknown command. Use /help for supported commands.'); + continue; + } + + const ref: ChannelBridgeConversationRef = { + providerId: adapter.providerId, + conversationId: event.conversationId, + threadId: event.threadId, + }; + let binding: ChannelSessionBinding | null; + try { + binding = await params.store.getBinding(ref); + } catch (error) { + params.deps.onWarning?.( + `Failed to read binding for inbound message forwarding (provider=${adapter.providerId} conversation=${event.conversationId} thread=${event.threadId ?? 'null'})`, + error, + ); + await replyToConversation( + adapter, + ref, + 'Failed to read current session binding. Please try again later.', + ); + continue; + } + + if (!binding) { + await replyToConversation( + adapter, + ref, + 'No session is attached here. Use /attach first.', + ); + continue; + } + + try { + await params.deps.sendUserMessageToSession({ + sessionId: binding.sessionId, + text: event.text, + sentFrom: adapter.providerId, + providerId: adapter.providerId, + conversationId: event.conversationId, + threadId: event.threadId, + }); + } catch (error) { + params.deps.onWarning?.( + `Failed to forward channel message into session ${binding.sessionId} (provider=${adapter.providerId} conversation=${event.conversationId} thread=${event.threadId ?? 'null'} messageId=${event.messageId})`, + error, + ); + await replyToConversation( + adapter, + ref, + `Failed to send message to session ${binding.sessionId}.`, + ); + } + } catch (error) { + params.deps.onWarning?.(`Failed to process inbound message for adapter ${adapter.providerId}`, error); + } + } + } + + let bindings: ChannelSessionBinding[]; + try { + bindings = await params.store.listBindings(); + } catch (error) { + params.deps.onWarning?.('Failed to list bindings for outbound forwarding', error); + return; + } + + const warnedMissingAdapterBindings = params.warnedMissingAdapterBindings; + for (const binding of bindings) { + const missingBindingWarningKey = bindingKey(binding); + const adapter = adapterByProvider.get(binding.providerId); + if (!adapter) { + if (!warnedMissingAdapterBindings || !warnedMissingAdapterBindings.has(missingBindingWarningKey)) { + params.deps.onWarning?.( + `No adapter registered for binding providerId=${binding.providerId} conversationId=${binding.conversationId}; skipping outbound forwarding`, + ); + warnedMissingAdapterBindings?.add(missingBindingWarningKey); + } + continue; + } + + warnedMissingAdapterBindings?.delete(missingBindingWarningKey); + + try { + const messages = await params.deps.fetchAgentMessagesAfterSeq({ + sessionId: binding.sessionId, + afterSeq: binding.lastForwardedSeq, + }); + + const orderedMessages: Array> = []; + for (const row of messages) { + const parsedSeq = toNonNegativeInt(row.seq); + if (parsedSeq === null) { + params.deps.onWarning?.( + `Skipped agent output row with invalid seq for session ${binding.sessionId}`, + ); + continue; + } + orderedMessages.push({ + seq: parsedSeq, + text: row.text, + }); + } + + orderedMessages.sort((left, right) => left.seq - right.seq); + + let maxSeq = binding.lastForwardedSeq; + const persistCursor = async (nextSeq: number): Promise => { + try { + maxSeq = nextSeq; + await params.store.updateLastForwardedSeq(binding, maxSeq); + return true; + } catch (error) { + params.deps.onWarning?.( + `Failed to persist channel bridge cursor for session=${binding.sessionId} provider=${binding.providerId} conversation=${binding.conversationId} seq=${nextSeq}`, + error, + ); + return false; + } + }; + + for (const row of orderedMessages) { + const parsedSeq = row.seq; + if (parsedSeq <= maxSeq) { + continue; + } + + const nextSeq = parsedSeq; + const text = String(row.text).trim(); + if (!text) { + const persisted = await persistCursor(nextSeq); + if (!persisted) { + break; + } + continue; + } + await adapter.sendMessage({ + conversationId: binding.conversationId, + threadId: binding.threadId, + text, + }); + const persisted = await persistCursor(nextSeq); + if (!persisted) { + break; + } + } + } catch (error) { + params.deps.onWarning?.( + `Failed to forward agent output to channel for session=${binding.sessionId} provider=${binding.providerId} conversation=${binding.conversationId}`, + error, + ); + } + } +} + +/** + * Start the bridge worker loop. + * + * - `tickMs` is clamped to a minimum of 250ms (default 2500ms) + * - Uses single-flight scheduling: only one tick runs at a time + * - `trigger()` requests an immediate tick + * - `stop()` is idempotent, drains in-flight tick, then stops adapters + * - Adapter shutdown deduplicates by object identity, not `providerId` + */ +export function startChannelBridgeWorker(params: Readonly<{ + store: ChannelBindingStore; + adapters: readonly ChannelBridgeAdapter[]; + deps: ChannelBridgeDeps; + tickMs?: number; +}>): ChannelBridgeWorkerHandle { + const tickMs = + typeof params.tickMs === 'number' && Number.isFinite(params.tickMs) && params.tickMs > 0 + ? Math.max(250, Math.trunc(params.tickMs)) + : 2_500; + + const inboundDeduper = createChannelBridgeInboundDeduper(); + const warnedMissingAdapterBindings = new Set(); + let inFlightTick: Promise | null = null; + + const runTick = async (): Promise => { + const tickRun = executeChannelBridgeTick({ + store: params.store, + adapters: params.adapters, + deps: params.deps, + inboundDeduper, + warnedMissingAdapterBindings, + }); + inFlightTick = tickRun; + try { + await tickRun; + } finally { + if (inFlightTick === tickRun) { + inFlightTick = null; + } + } + }; + + let loop: SingleFlightIntervalLoopHandle | null = startSingleFlightIntervalLoop({ + intervalMs: tickMs, + task: runTick, + onError: (error) => { + params.deps.onWarning?.('Channel bridge tick failed', error); + }, + }); + + loop.trigger(); + let stopPromise: Promise | null = null; + + return { + stop: async () => { + if (stopPromise) { + await stopPromise; + return; + } + + stopPromise = (async () => { + const activeLoop = loop; + loop = null; + activeLoop?.stop(); + + const currentTick = inFlightTick; + if (currentTick) { + try { + await currentTick; + } catch { + // Tick failures are already surfaced via loop onError while running. + // During shutdown we only drain the in-flight tick before adapter stop. + } + } + + const adaptersToStop: ChannelBridgeAdapter[] = []; + const seenAdapters = new Set(); + for (const adapter of params.adapters) { + if (seenAdapters.has(adapter)) { + continue; + } + seenAdapters.add(adapter); + adaptersToStop.push(adapter); + } + + const stopResults = await Promise.allSettled( + adaptersToStop.map(async (adapter) => { + if (typeof adapter.stop !== 'function') return; + await adapter.stop(); + }), + ); + + stopResults.forEach((result, index) => { + if (result.status === 'rejected') { + const providerId = adaptersToStop[index]?.providerId ?? 'unknown'; + params.deps.onWarning?.(`Failed to stop channel adapter ${providerId} during shutdown`, result.reason); + } + }); + })(); + + await stopPromise; + }, + trigger: () => loop?.trigger(), + }; +} diff --git a/docs/README.md b/docs/README.md index e6e7dec40..3d95e11fe 100644 --- a/docs/README.md +++ b/docs/README.md @@ -9,6 +9,7 @@ This folder documents how Happier works internally, with a focus on protocol, ba - backend-architecture.md: Internal backend structure, data flow, and key subsystems. - deployment.md: How to deploy the backend and required infrastructure. - cli-architecture.md: CLI and daemon architecture and how they interact with the server. +- channel-bridge.md: Provider-agnostic bridge architecture and adapter contract. - issue-triage.md: How the GitHub issue triage workflows are wired to maintainer tooling. ## Conventions diff --git a/docs/channel-bridge.md b/docs/channel-bridge.md new file mode 100644 index 000000000..c7cce7e03 --- /dev/null +++ b/docs/channel-bridge.md @@ -0,0 +1,57 @@ +# Channel Bridge Core + +The channel bridge core is a provider-agnostic runtime that maps external channel conversations to Happier sessions. + +Core implementation in this PR: + +- Worker/runtime loop: `apps/cli/src/channels/core/channelBridgeWorker.ts` + +## Core responsibilities + +- receive inbound messages from adapter implementations +- parse shared control commands (`/sessions`, `/attach`, `/session`, `/detach`, `/help`, `/start` as alias of `/help`) +- maintain conversation-to-session bindings by `(providerId, conversationId, threadId|null)` +- forward bound user text into the target session +- fetch assistant output after a cursor and forward back to channel conversations +- track per-binding cursor (`lastForwardedSeq`) to avoid replaying older assistant rows + +## Binding model + +Bindings are keyed by provider + conversation + optional thread/topic. + +- `providerId`: adapter namespace (for example, channel family) +- `conversationId`: channel-specific room/chat identifier +- `threadId`: optional sub-thread/topic identifier +- `sessionId`: Happier session bound to that conversation key +- `lastForwardedSeq`: last assistant transcript row sent through bridge + +## Tick loop behavior + +Each tick performs: + +1. pull inbound messages from each adapter +2. handle control commands (`/sessions`, `/attach`, `/session`, `/detach`, `/help`, `/start`) +3. route non-command user text to bound sessions +4. list current bindings +5. fetch assistant rows after each binding cursor +6. forward assistant messages back through the adapter +7. advance binding cursor after successful forwarding + +The worker uses single-flight scheduling in `startChannelBridgeWorker` so only one tick executes at a time. + +## Adapter contract + +Adapters plug into the core using a small interface: + +- `providerId` +- `pullInboundMessages()` +- `sendMessage({ conversationId, threadId, text })` +- optional `stop()` lifecycle hook + +This keeps command semantics, binding behavior, and session forwarding logic centralized in the core. + +## Out of scope for this PR + +This document covers core bridge runtime behavior only. + +Provider-specific transport details, bridge runtime bootstrapping, scoped config sources, and server-relay deployment modes are covered in follow-up stacked PRs. From 28eb9b94399dad2f147b3b5d854701df0c0bd1eb Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 13:51:19 -0500 Subject: [PATCH 02/10] test(cli): assert duplicate adapter warning in worker lifecycle --- apps/cli/src/channels/core/channelBridgeWorker.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts index 9b591fbaa..1675223ac 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.test.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -1724,7 +1724,7 @@ describe('startChannelBridgeWorker', () => { }, }; - const { deps } = createDepsHarness(); + const { deps, warnings } = createDepsHarness(); const worker = startChannelBridgeWorker({ store, adapters: [sharedAdapter, sharedAdapter], @@ -1735,6 +1735,7 @@ describe('startChannelBridgeWorker', () => { try { await worker.stop(); expect(stopCounter.calls).toBe(1); + expect(warnings.some((row) => row.message.includes('Duplicate adapter providerId detected: telegram'))).toBe(true); } finally { await worker.stop(); } From 972a51c1ca10dafcea0f14baedcdcbb3b57e4ac1 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 14:04:35 -0500 Subject: [PATCH 03/10] fix(cli): normalize in-memory channel bridge cursor updates --- .../channels/core/channelBridgeWorker.test.ts | 26 +++++++++++++++++++ .../src/channels/core/channelBridgeWorker.ts | 7 +++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts index 1675223ac..86b62e430 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.test.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -143,6 +143,32 @@ async function waitFor(condition: () => boolean, timeoutMs: number = 2_000): Pro } } +describe('createInMemoryChannelBindingStore', () => { + it('normalizes non-finite cursor values and ignores invalid cursor updates', async () => { + const store = createInMemoryChannelBindingStore(); + const ref = { + providerId: 'telegram', + conversationId: '-100-cursor', + threadId: null, + } as const; + + await store.upsertBinding({ + ...ref, + sessionId: 'sess-cursor', + lastForwardedSeq: Number.NaN, + }); + + const created = await store.getBinding(ref); + expect(created?.lastForwardedSeq).toBe(0); + + await store.updateLastForwardedSeq(ref, 7); + await store.updateLastForwardedSeq(ref, Number.NaN); + + const updated = await store.getBinding(ref); + expect(updated?.lastForwardedSeq).toBe(7); + }); +}); + describe('executeChannelBridgeTick', () => { it('supports /attach then forwards inbound user messages into the bound session', async () => { const store = createInMemoryChannelBindingStore(); diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index e4578bcfa..3b00c9582 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -210,12 +210,13 @@ export function createInMemoryChannelBindingStore(now: () => number = () => Date upsertBinding: async (binding) => { const key = bindingKey(binding); const existing = byKey.get(key); + const normalizedLastForwardedSeq = toNonNegativeInt(binding.lastForwardedSeq) ?? 0; const next: ChannelSessionBinding = { providerId: binding.providerId, conversationId: binding.conversationId, threadId: binding.threadId, sessionId: binding.sessionId, - lastForwardedSeq: Math.max(0, Math.trunc(binding.lastForwardedSeq)), + lastForwardedSeq: normalizedLastForwardedSeq, createdAtMs: existing?.createdAtMs ?? now(), updatedAtMs: now(), }; @@ -226,7 +227,9 @@ export function createInMemoryChannelBindingStore(now: () => number = () => Date const key = bindingKey(ref); const existing = byKey.get(key); if (!existing) return; - const nextSeq = Math.max(existing.lastForwardedSeq, Math.max(0, Math.trunc(seq))); + const parsedSeq = toNonNegativeInt(seq); + if (parsedSeq === null) return; + const nextSeq = Math.max(existing.lastForwardedSeq, parsedSeq); byKey.set(key, { ...existing, lastForwardedSeq: nextSeq, From e0e1f5a2b3254e804ac51969f874c6f5402040e9 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 14:15:29 -0500 Subject: [PATCH 04/10] fix(cli): harden attach seq warning and worker stop race --- .../channels/core/channelBridgeWorker.test.ts | 47 ++++++++++++++++++- .../src/channels/core/channelBridgeWorker.ts | 11 +++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts index 86b62e430..b7f034a80 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.test.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -742,7 +742,7 @@ describe('executeChannelBridgeTick', () => { const store = createInMemoryChannelBindingStore(); const harness = createAdapterHarness(); - const { deps } = createDepsHarness({ + const { deps, warnings } = createDepsHarness({ resolveSessionIdOrPrefix: async () => ({ ok: true as const, sessionId: 'sess-invalid-seq-value' }), resolveLatestSessionSeq: async () => Number.NaN, }); @@ -764,6 +764,7 @@ describe('executeChannelBridgeTick', () => { expect(await store.listBindings()).toHaveLength(0); expect(harness.sent.some((row) => row.text.includes('Failed to attach'))).toBe(true); + expect(warnings.some((row) => row.message.includes('resolveLatestSessionSeq returned an invalid value'))).toBe(true); }); it('does not attach when binding persistence fails', async () => { @@ -1656,6 +1657,50 @@ describe('startChannelBridgeWorker', () => { } }); + it('waits for startup-triggered tick when stop is called immediately', async () => { + const store = createInMemoryChannelBindingStore(); + const gate = createDeferredPromise(); + let startedTick = false; + let stopCallCount = 0; + + const adapter: ChannelBridgeAdapter = { + providerId: 'telegram', + pullInboundMessages: async () => { + startedTick = true; + await gate.promise; + return []; + }, + sendMessage: async () => {}, + stop: async () => { + stopCallCount += 1; + }, + }; + + const { deps } = createDepsHarness(); + const worker = startChannelBridgeWorker({ + store, + adapters: [adapter], + deps, + tickMs: 60_000, + }); + + try { + const stopPromise = worker.stop(); + + await Promise.resolve(); + await waitFor(() => startedTick); + expect(stopCallCount).toBe(0); + + gate.resolve(); + await stopPromise; + + expect(stopCallCount).toBe(1); + } finally { + gate.resolve(); + await worker.stop(); + } + }); + it('continues stopping remaining adapters if one stop fails', async () => { const store = createInMemoryChannelBindingStore(); let secondaryStopped = false; diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index 3b00c9582..d088c997e 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -446,6 +446,9 @@ async function handleCommand(params: Readonly<{ try { const resolvedSeq = toNonNegativeInt(await deps.resolveLatestSessionSeq(resolved.sessionId)); if (resolvedSeq === null) { + deps.onWarning?.( + `resolveLatestSessionSeq returned an invalid value for session ${resolved.sessionId}; expected a non-negative integer`, + ); await replyToConversation( adapter, ref, @@ -530,6 +533,12 @@ async function handleCommand(params: Readonly<{ * Deduper behavior: * - `inboundDeduper` is required to make dedupe-state ownership explicit. * - Use `createChannelBridgeInboundDeduper()` to construct per-worker dedupe state. + * + * Missing-adapter warning deduplication: + * - `warnedMissingAdapterBindings` is optional. When omitted, missing-adapter + * warnings are emitted per binding on each call. + * - Pass a stable `Set` across calls to dedupe warning spam in + * long-running loops (as `startChannelBridgeWorker` does). */ export async function executeChannelBridgeTick(params: Readonly<{ store: ChannelBindingStore; @@ -819,6 +828,8 @@ export function startChannelBridgeWorker(params: Readonly<{ loop = null; activeLoop?.stop(); + await Promise.resolve(); + const currentTick = inFlightTick; if (currentTick) { try { From f4749d89ee6576f40e71df355a059a3bfd59964d Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 14:52:07 -0500 Subject: [PATCH 05/10] test(channel-bridge): address minor and nitpick review feedback --- .../channels/core/channelBridgeWorker.test.ts | 20 ++++++++++++++++--- .../src/channels/core/channelBridgeWorker.ts | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts index b7f034a80..9a156b7f7 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.test.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -9,7 +9,7 @@ import { type ChannelBindingStore, type ChannelBridgeDeps, type ChannelBridgeInboundMessage, -} from './channelBridgeWorker'; +} from '@/channels/core/channelBridgeWorker'; interface SentConversationMessage { conversationId: string; @@ -348,7 +348,7 @@ describe('executeChannelBridgeTick', () => { expect(harness.sent.some((row) => row.text.includes('Failed to retrieve sessions'))).toBe(true); }); - it('supports /session command for attached and non-attached conversations', async () => { + it('supports /session command for attached conversations', async () => { const store = createInMemoryChannelBindingStore(); const harness = createAdapterHarness(); const { deps } = createDepsHarness(); @@ -368,6 +368,21 @@ describe('executeChannelBridgeTick', () => { text: '/session', messageId: 'm-session-bound', }); + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + }); + + expect(harness.sent.some((row) => row.text.includes('Attached session: sess-bound'))).toBe(true); + }); + + it('supports /session command for non-attached conversations', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness(); + const { deps } = createDepsHarness(); + harness.pushInbound({ providerId: 'telegram', conversationId: '-1001', @@ -383,7 +398,6 @@ describe('executeChannelBridgeTick', () => { inboundDeduper: createChannelBridgeInboundDeduper(), }); - expect(harness.sent.some((row) => row.text.includes('Attached session: sess-bound'))).toBe(true); expect(harness.sent.some((row) => row.text.includes('No session is attached here'))).toBe(true); }); diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index d088c997e..2552df06c 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -736,7 +736,7 @@ export async function executeChannelBridgeTick(params: Readonly<{ } const nextSeq = parsedSeq; - const text = String(row.text).trim(); + const text = String(row.text ?? '').trim(); if (!text) { const persisted = await persistCursor(nextSeq); if (!persisted) { From bff019b7930eb60f95d1eb069f3fc055f99cf168 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 15:10:32 -0500 Subject: [PATCH 06/10] fix(channel-bridge): address latest PR109 review issues --- .../channels/core/channelBridgeWorker.test.ts | 59 +++++++++++++ .../src/channels/core/channelBridgeWorker.ts | 83 ++++++++++++++----- 2 files changed, 123 insertions(+), 19 deletions(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.test.ts b/apps/cli/src/channels/core/channelBridgeWorker.test.ts index 9a156b7f7..36e899965 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.test.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.test.ts @@ -1515,6 +1515,65 @@ describe('executeChannelBridgeTick', () => { expect(missingAdapterWarnings).toHaveLength(1); }); + it('prunes stale missing-adapter warning keys when bindings are removed', async () => { + const store = createInMemoryChannelBindingStore(); + const harness = createAdapterHarness('discord'); + + const bindingRef = { + providerId: 'telegram', + conversationId: 'orphaned-room', + threadId: null, + } as const; + + await store.upsertBinding({ + ...bindingRef, + sessionId: 'sess-orphaned', + lastForwardedSeq: 0, + }); + + const { deps, warnings } = createDepsHarness({ + fetchAgentMessagesAfterSeq: async () => [{ seq: 1, text: 'ignored' }], + }); + const warnedMissingAdapterBindings = new Set(); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + warnedMissingAdapterBindings, + }); + + await store.removeBinding(bindingRef); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + warnedMissingAdapterBindings, + }); + + await store.upsertBinding({ + ...bindingRef, + sessionId: 'sess-orphaned-2', + lastForwardedSeq: 0, + }); + + await executeChannelBridgeTick({ + store, + adapters: [harness.adapter], + deps, + inboundDeduper: createChannelBridgeInboundDeduper(), + warnedMissingAdapterBindings, + }); + + const missingAdapterWarnings = warnings.filter((row) => + row.message.includes('No adapter registered for binding providerId=telegram'), + ); + expect(missingAdapterWarnings).toHaveLength(2); + }); + it('warns and exits tick when listing bindings fails', async () => { const baseStore = createInMemoryChannelBindingStore(); const store: ChannelBindingStore = { diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index 2552df06c..02b5355a2 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -145,6 +145,26 @@ function toNonNegativeInt(value: unknown): number | null { return parsed; } +const EXTERNAL_IO_TIMEOUT_MS = 30_000; + +async function withTimeout(promise: Promise, timeoutMs: number, label: string): Promise { + let timeoutHandle: ReturnType | null = null; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`${label} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }), + ]); + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + type ChannelBridgeInboundDeduper = Readonly<{ isDuplicate: (message: ChannelBridgeInboundMessage) => boolean; }>; @@ -244,7 +264,7 @@ function parseSlashCommand(text: string): Readonly<{ name: string; args: string[ const trimmed = text.trim(); if (!trimmed.startsWith('/')) return null; const [rawName, ...args] = trimmed.slice(1).split(/\s+/g); - const normalized = String(rawName ?? '').trim().toLowerCase(); + const normalized = String(rawName).trim().toLowerCase(); if (!normalized) return null; const name = normalized.split('@')[0]!.trim(); if (!name) return null; @@ -565,7 +585,11 @@ export async function executeChannelBridgeTick(params: Readonly<{ for (const adapter of activeAdapters) { let inbound: ChannelBridgeInboundMessage[]; try { - inbound = await adapter.pullInboundMessages(); + inbound = await withTimeout( + adapter.pullInboundMessages(), + EXTERNAL_IO_TIMEOUT_MS, + `pullInboundMessages(${adapter.providerId})`, + ); } catch (error) { params.deps.onWarning?.(`Failed to pull inbound messages for adapter ${adapter.providerId}`, error); continue; @@ -642,14 +666,18 @@ export async function executeChannelBridgeTick(params: Readonly<{ } try { - await params.deps.sendUserMessageToSession({ - sessionId: binding.sessionId, - text: event.text, - sentFrom: adapter.providerId, - providerId: adapter.providerId, - conversationId: event.conversationId, - threadId: event.threadId, - }); + await withTimeout( + params.deps.sendUserMessageToSession({ + sessionId: binding.sessionId, + text: event.text, + sentFrom: adapter.providerId, + providerId: adapter.providerId, + conversationId: event.conversationId, + threadId: event.threadId, + }), + EXTERNAL_IO_TIMEOUT_MS, + `sendUserMessageToSession(${binding.sessionId})`, + ); } catch (error) { params.deps.onWarning?.( `Failed to forward channel message into session ${binding.sessionId} (provider=${adapter.providerId} conversation=${event.conversationId} thread=${event.threadId ?? 'null'} messageId=${event.messageId})`, @@ -676,6 +704,15 @@ export async function executeChannelBridgeTick(params: Readonly<{ } const warnedMissingAdapterBindings = params.warnedMissingAdapterBindings; + if (warnedMissingAdapterBindings) { + const activeBindingKeys = new Set(bindings.map((binding) => bindingKey(binding))); + for (const warnedKey of warnedMissingAdapterBindings) { + if (!activeBindingKeys.has(warnedKey)) { + warnedMissingAdapterBindings.delete(warnedKey); + } + } + } + for (const binding of bindings) { const missingBindingWarningKey = bindingKey(binding); const adapter = adapterByProvider.get(binding.providerId); @@ -692,10 +729,14 @@ export async function executeChannelBridgeTick(params: Readonly<{ warnedMissingAdapterBindings?.delete(missingBindingWarningKey); try { - const messages = await params.deps.fetchAgentMessagesAfterSeq({ - sessionId: binding.sessionId, - afterSeq: binding.lastForwardedSeq, - }); + const messages = await withTimeout( + params.deps.fetchAgentMessagesAfterSeq({ + sessionId: binding.sessionId, + afterSeq: binding.lastForwardedSeq, + }), + EXTERNAL_IO_TIMEOUT_MS, + `fetchAgentMessagesAfterSeq(${binding.sessionId})`, + ); const orderedMessages: Array> = []; for (const row of messages) { @@ -744,11 +785,15 @@ export async function executeChannelBridgeTick(params: Readonly<{ } continue; } - await adapter.sendMessage({ - conversationId: binding.conversationId, - threadId: binding.threadId, - text, - }); + await withTimeout( + adapter.sendMessage({ + conversationId: binding.conversationId, + threadId: binding.threadId, + text, + }), + EXTERNAL_IO_TIMEOUT_MS, + `sendMessage(${adapter.providerId})`, + ); const persisted = await persistCursor(nextSeq); if (!persisted) { break; From 3bdc6e47016fbb8e6e18351e5ab1b1cc01e9516f Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 15:21:35 -0500 Subject: [PATCH 07/10] fix(channel-bridge): handle late timeout rejections safely --- apps/cli/src/channels/core/channelBridgeWorker.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index 02b5355a2..bd0d9401a 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -52,6 +52,8 @@ export type ChannelBridgeActorContext = Readonly<{ * - `pullInboundMessages` should return available inbound items without throwing for * normal empty states (return `[]` instead). * - `sendMessage` should deliver text into a target conversation/thread. + * - `sendMessage` should tolerate at-least-once delivery attempts. Timeout races may + * trigger retries, so provider adapters should be idempotent when possible. * - `stop` is optional and should tear down adapter resources. */ export type ChannelBridgeAdapter = Readonly<{ @@ -148,6 +150,10 @@ function toNonNegativeInt(value: unknown): number | null { const EXTERNAL_IO_TIMEOUT_MS = 30_000; async function withTimeout(promise: Promise, timeoutMs: number, label: string): Promise { + // Promise.race does not cancel the underlying operation. Attach a no-op rejection + // handler so late failures do not surface as unhandled rejections after timeout. + void promise.catch(() => undefined); + let timeoutHandle: ReturnType | null = null; try { return await Promise.race([ From 78898fc34cafb4ce19294beb35229f17cb3274e4 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 15:28:32 -0500 Subject: [PATCH 08/10] fix(channel-bridge): bound command reply send operations --- apps/cli/src/channels/core/channelBridgeWorker.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index bd0d9401a..539d1342b 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -296,11 +296,15 @@ async function replyToConversation( conversation: Readonly<{ conversationId: string; threadId: string | null }>, text: string, ): Promise { - await adapter.sendMessage({ - conversationId: conversation.conversationId, - threadId: conversation.threadId, - text, - }); + await withTimeout( + adapter.sendMessage({ + conversationId: conversation.conversationId, + threadId: conversation.threadId, + text, + }), + EXTERNAL_IO_TIMEOUT_MS, + `replyToConversation(${adapter.providerId}:${conversation.conversationId})`, + ); } async function authorizeCommand(params: Readonly<{ From 47715843cc5cd6d28b95241548354abbb1ff7667 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 15:33:02 -0500 Subject: [PATCH 09/10] fix(channel-bridge): make external I/O timeout configurable --- .../src/channels/core/channelBridgeWorker.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index 539d1342b..85030de32 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -147,7 +147,23 @@ function toNonNegativeInt(value: unknown): number | null { return parsed; } -const EXTERNAL_IO_TIMEOUT_MS = 30_000; +const DEFAULT_EXTERNAL_IO_TIMEOUT_MS = 30_000; + +function resolveExternalIoTimeoutMs(): number { + const raw = (process.env.HAPPIER_CHANNEL_BRIDGE_IO_TIMEOUT_MS ?? '').trim(); + if (!raw) { + return DEFAULT_EXTERNAL_IO_TIMEOUT_MS; + } + + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_EXTERNAL_IO_TIMEOUT_MS; + } + + return parsed; +} + +const EXTERNAL_IO_TIMEOUT_MS = resolveExternalIoTimeoutMs(); async function withTimeout(promise: Promise, timeoutMs: number, label: string): Promise { // Promise.race does not cancel the underlying operation. Attach a no-op rejection From 4d2649a2b7f8d7e7015b56fd8626c1e0bc76bce9 Mon Sep 17 00:00:00 2001 From: Chris Denneen Date: Thu, 5 Mar 2026 15:42:36 -0500 Subject: [PATCH 10/10] chore(channel-bridge): remove unreachable outbound text fallback --- apps/cli/src/channels/core/channelBridgeWorker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/cli/src/channels/core/channelBridgeWorker.ts b/apps/cli/src/channels/core/channelBridgeWorker.ts index 85030de32..ac8addbb7 100644 --- a/apps/cli/src/channels/core/channelBridgeWorker.ts +++ b/apps/cli/src/channels/core/channelBridgeWorker.ts @@ -803,7 +803,7 @@ export async function executeChannelBridgeTick(params: Readonly<{ } const nextSeq = parsedSeq; - const text = String(row.text ?? '').trim(); + const text = String(row.text).trim(); if (!text) { const persisted = await persistCursor(nextSeq); if (!persisted) {