Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 141 additions & 67 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,132 @@ interface McpToolDef {
inputSchema: Record<string, unknown>;
}

interface McpConnection {
client: net.Socket;
nextId: number;
pending: Map<number, { resolve: (value: unknown) => void; reject: (reason: Error) => void }>;
buf: string;
initialized: boolean;
}

const connections = new Map<string, McpConnection>();

function getConnectionKey(host: string, port: number): string {
return `${host}:${port}`;
}

function getOrCreateConnection(host: string, port: number): Promise<McpConnection> {
const key = getConnectionKey(host, port);
const existing = connections.get(key);
if (existing && !existing.client.destroyed) {
return Promise.resolve(existing);
}

return new Promise((resolve, reject) => {
const conn: McpConnection = {
client: net.createConnection(port, host),
nextId: 1,
pending: new Map(),
buf: "",
initialized: false,
};

const onConnect = () => {
const id = conn.nextId++;
conn.client.write(
JSON.stringify({
jsonrpc: "2.0",
id,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "openclaw-dimos", version: "0.0.1" },
},
}) + "\n",
);
conn.pending.set(id, {
resolve: () => {
conn.initialized = true;
connections.set(key, conn);
resolve(conn);
},
reject,
});
};

conn.client.on("connect", onConnect);

conn.client.on("data", (d) => {
conn.buf += d.toString();
const lines = conn.buf.split("\n");
conn.buf = lines.pop() || "";
for (const line of lines) {
if (!line.trim()) continue;
try {
const msg = JSON.parse(line) as { id?: number; result?: unknown; error?: { message: string } };
if (msg.id != null) {
const p = conn.pending.get(msg.id);
if (p) {
conn.pending.delete(msg.id);
if (msg.error) {
p.reject(new Error(msg.error.message));
} else {
p.resolve(msg.result);
}
}
}
} catch {
// ignore malformed JSON lines
}
}
});

conn.client.on("error", (err) => {
connections.delete(key);
for (const p of conn.pending.values()) {
p.reject(err);
}
conn.pending.clear();
reject(err);
});

conn.client.on("close", () => {
connections.delete(key);
const closeErr = new Error("Connection closed");
for (const p of conn.pending.values()) {
p.reject(closeErr);
}
conn.pending.clear();
});
});
}

async function sendRequest(conn: McpConnection, method: string, params: Record<string, unknown>): Promise<unknown> {
return new Promise((resolve, reject) => {
const id = conn.nextId++;
const timer = setTimeout(() => {
conn.pending.delete(id);
reject(new Error(`MCP request '${method}' timed out`));
}, CALL_TIMEOUT_MS);

conn.pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
resolve(value);
},
reject: (err) => {
clearTimeout(timer);
reject(err);
},
});

conn.client.write(
JSON.stringify({ jsonrpc: "2.0", id, method, params }) + "\n",
);
});
}

function getHost(pluginConfig?: Record<string, unknown>): string {
if (pluginConfig && typeof pluginConfig.mcpHost === "string" && pluginConfig.mcpHost) {
return pluginConfig.mcpHost;
Expand Down Expand Up @@ -103,79 +229,27 @@ client.on('error',e=>{process.stderr.write(e.message);process.exit(1);});
return JSON.parse(result);
}

/** Call an MCP tool via direct TCP connection to the DimOS server. */
/** Call an MCP tool via persistent TCP connection to the DimOS server. */
async function callTool(
host: string,
port: number,
name: string,
args: Record<string, unknown>,
): Promise<string> {
return new Promise((resolve, reject) => {
const client = net.createConnection(port, host, () => {
client.write(
JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "openclaw-dimos", version: "0.0.1" },
},
}) + "\n",
);
});

let buf = "";
let phase: "init" | "call" | "done" = "init";

const timer = setTimeout(() => {
client.destroy();
reject(new Error("MCP tool call timed out"));
}, CALL_TIMEOUT_MS);

client.on("data", (d) => {
buf += d.toString();
const lines = buf.split("\n");
buf = lines.pop() || "";
for (const line of lines) {
if (!line.trim()) continue;
const msg = JSON.parse(line);
if (phase === "init" && msg.id === 1) {
phase = "call";
client.write(
JSON.stringify({
jsonrpc: "2.0",
id: 2,
method: "tools/call",
params: { name, arguments: args },
}) + "\n",
);
} else if (phase === "call" && msg.id === 2) {
phase = "done";
clearTimeout(timer);
if (msg.error) {
resolve(`Error: ${msg.error.message}`);
} else {
const content = msg.result?.content;
const text = Array.isArray(content)
? content
.filter((c: { type: string }) => c.type === "text")
.map((c: { text: string }) => c.text)
.join("\n")
: JSON.stringify(content);
resolve(text || "OK");
}
client.end();
}
}
});

client.on("error", (err) => {
clearTimeout(timer);
reject(err);
});
});
const conn = await getOrCreateConnection(host, port);
const result = (await sendRequest(conn, "tools/call", { name, arguments: args })) as {
content?: Array<{ type: string; text?: string }>;
};
const content = result?.content;
if (Array.isArray(content)) {
return (
content
.filter((c) => c.type === "text")
.map((c) => c.text)
.join("\n") || "OK"
);
}
return JSON.stringify(content) || "OK";
}

export default {
Expand Down