From 3d07215158679c6c722e0792df71e46d63afcb73 Mon Sep 17 00:00:00 2001 From: "." Date: Mon, 9 Mar 2026 00:20:50 +0800 Subject: [PATCH] fix: reuse TCP connection instead of reconnecting on every tool call Previously each callTool invocation opened a new TCP connection and re-ran the MCP initialize handshake. For robot control workloads with frequent tool calls this added significant latency. This change introduces a persistent connection pool keyed by host:port that initializes once and multiplexes subsequent requests with incrementing IDs. Connections auto-recover on close/error. --- index.ts | 208 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 141 insertions(+), 67 deletions(-) diff --git a/index.ts b/index.ts index bba3593..96e8bbf 100644 --- a/index.ts +++ b/index.ts @@ -14,6 +14,132 @@ interface McpToolDef { inputSchema: Record; } +interface McpConnection { + client: net.Socket; + nextId: number; + pending: Map void; reject: (reason: Error) => void }>; + buf: string; + initialized: boolean; +} + +const connections = new Map(); + +function getConnectionKey(host: string, port: number): string { + return `${host}:${port}`; +} + +function getOrCreateConnection(host: string, port: number): Promise { + const key = getConnectionKey(host, port); + const existing = connections.get(key); + if (existing && !existing.client.destroyed) { + return Promise.resolve(existing); + } + + return new Promise((resolve, reject) => { + const conn: McpConnection = { + client: net.createConnection(port, host), + nextId: 1, + pending: new Map(), + buf: "", + initialized: false, + }; + + const onConnect = () => { + const id = conn.nextId++; + conn.client.write( + JSON.stringify({ + jsonrpc: "2.0", + id, + method: "initialize", + params: { + protocolVersion: "2024-11-05", + capabilities: {}, + clientInfo: { name: "openclaw-dimos", version: "0.0.1" }, + }, + }) + "\n", + ); + conn.pending.set(id, { + resolve: () => { + conn.initialized = true; + connections.set(key, conn); + resolve(conn); + }, + reject, + }); + }; + + conn.client.on("connect", onConnect); + + conn.client.on("data", (d) => { + conn.buf += d.toString(); + const lines = conn.buf.split("\n"); + conn.buf = lines.pop() || ""; + for (const line of lines) { + if (!line.trim()) continue; + try { + const msg = JSON.parse(line) as { id?: number; result?: unknown; error?: { message: string } }; + if (msg.id != null) { + const p = conn.pending.get(msg.id); + if (p) { + conn.pending.delete(msg.id); + if (msg.error) { + p.reject(new Error(msg.error.message)); + } else { + p.resolve(msg.result); + } + } + } + } catch { + // ignore malformed JSON lines + } + } + }); + + conn.client.on("error", (err) => { + connections.delete(key); + for (const p of conn.pending.values()) { + p.reject(err); + } + conn.pending.clear(); + reject(err); + }); + + conn.client.on("close", () => { + connections.delete(key); + const closeErr = new Error("Connection closed"); + for (const p of conn.pending.values()) { + p.reject(closeErr); + } + conn.pending.clear(); + }); + }); +} + +async function sendRequest(conn: McpConnection, method: string, params: Record): Promise { + return new Promise((resolve, reject) => { + const id = conn.nextId++; + const timer = setTimeout(() => { + conn.pending.delete(id); + reject(new Error(`MCP request '${method}' timed out`)); + }, CALL_TIMEOUT_MS); + + conn.pending.set(id, { + resolve: (value) => { + clearTimeout(timer); + resolve(value); + }, + reject: (err) => { + clearTimeout(timer); + reject(err); + }, + }); + + conn.client.write( + JSON.stringify({ jsonrpc: "2.0", id, method, params }) + "\n", + ); + }); +} + function getHost(pluginConfig?: Record): string { if (pluginConfig && typeof pluginConfig.mcpHost === "string" && pluginConfig.mcpHost) { return pluginConfig.mcpHost; @@ -103,79 +229,27 @@ client.on('error',e=>{process.stderr.write(e.message);process.exit(1);}); return JSON.parse(result); } -/** Call an MCP tool via direct TCP connection to the DimOS server. */ +/** Call an MCP tool via persistent TCP connection to the DimOS server. */ async function callTool( host: string, port: number, name: string, args: Record, ): Promise { - return new Promise((resolve, reject) => { - const client = net.createConnection(port, host, () => { - client.write( - JSON.stringify({ - jsonrpc: "2.0", - id: 1, - method: "initialize", - params: { - protocolVersion: "2024-11-05", - capabilities: {}, - clientInfo: { name: "openclaw-dimos", version: "0.0.1" }, - }, - }) + "\n", - ); - }); - - let buf = ""; - let phase: "init" | "call" | "done" = "init"; - - const timer = setTimeout(() => { - client.destroy(); - reject(new Error("MCP tool call timed out")); - }, CALL_TIMEOUT_MS); - - client.on("data", (d) => { - buf += d.toString(); - const lines = buf.split("\n"); - buf = lines.pop() || ""; - for (const line of lines) { - if (!line.trim()) continue; - const msg = JSON.parse(line); - if (phase === "init" && msg.id === 1) { - phase = "call"; - client.write( - JSON.stringify({ - jsonrpc: "2.0", - id: 2, - method: "tools/call", - params: { name, arguments: args }, - }) + "\n", - ); - } else if (phase === "call" && msg.id === 2) { - phase = "done"; - clearTimeout(timer); - if (msg.error) { - resolve(`Error: ${msg.error.message}`); - } else { - const content = msg.result?.content; - const text = Array.isArray(content) - ? content - .filter((c: { type: string }) => c.type === "text") - .map((c: { text: string }) => c.text) - .join("\n") - : JSON.stringify(content); - resolve(text || "OK"); - } - client.end(); - } - } - }); - - client.on("error", (err) => { - clearTimeout(timer); - reject(err); - }); - }); + const conn = await getOrCreateConnection(host, port); + const result = (await sendRequest(conn, "tools/call", { name, arguments: args })) as { + content?: Array<{ type: string; text?: string }>; + }; + const content = result?.content; + if (Array.isArray(content)) { + return ( + content + .filter((c) => c.type === "text") + .map((c) => c.text) + .join("\n") || "OK" + ); + } + return JSON.stringify(content) || "OK"; } export default {