Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/mcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@microsoft/teams.apps": "2.0.5",
"@microsoft/teams.common": "2.0.5",
"@microsoft/teams.dev": "2.0.5",
"@modelcontextprotocol/sdk": "^1.9.0"
"@modelcontextprotocol/sdk": "^1.25.0"
},
"peerDependenciesMeta": {
"@microsoft/teams.dev": {
Expand Down
5 changes: 4 additions & 1 deletion external/mcp/src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';

export type ServerTransport = SSEServerTransport | StreamableHTTPServerTransport;

export interface IConnection {
readonly id: number;
readonly transport: SSEServerTransport;
readonly transport: ServerTransport;
readonly createdAt: Date;
}
155 changes: 143 additions & 12 deletions external/mcp/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ServerOptions } from '@modelcontextprotocol/sdk/server/index.js';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { CallToolResult } from '@modelcontextprotocol/sdk/types.js';

import { jsonSchemaToZod } from 'json-schema-to-zod';
Expand Down Expand Up @@ -41,6 +42,28 @@ export type McpSSETransportOptions = {
readonly path?: string;
};

/**
* MCP transport options for streamable-http
*/
export type McpStreamableHTTPTransportOptions = {
/**
* the transport type
*/
readonly type: 'streamable-http';

/**
* the url path
* @default /mcp
*/
readonly path?: string;

/**
* whether to use stateful sessions
* @default true
*/
readonly stateful?: boolean;
};

/**
* MCP transport options for stdio
*/
Expand All @@ -61,6 +84,14 @@ export type McpStdioTransportOptions = {
readonly stdout?: Writable;
};

/**
* Union type for all MCP transport options
*/
export type McpTransportOptions =
| McpSSETransportOptions
| McpStreamableHTTPTransportOptions
| McpStdioTransportOptions;

export type McpPluginOptions = ServerOptions & {
/**
* the MCP server name
Expand All @@ -83,7 +114,7 @@ export type McpPluginOptions = ServerOptions & {
* the transport or transport options
* @default sse
*/
readonly transport?: McpSSETransportOptions | McpStdioTransportOptions;
readonly transport?: McpTransportOptions;

/**
* the url to use for the local
Expand Down Expand Up @@ -121,9 +152,10 @@ export class McpPlugin implements IPlugin {
protected id: number = -1;
protected inspector: string;
protected connections: Record<number, IConnection> = {};
protected transport: McpSSETransportOptions | McpStdioTransportOptions = {
protected transport: McpTransportOptions = {
type: 'sse',
};
protected httpSessions: Map<string, StreamableHTTPServerTransport> = new Map();

constructor(options: McpServer | McpPluginOptions = {}) {
this.inspector =
Expand Down Expand Up @@ -197,20 +229,31 @@ export class McpPlugin implements IPlugin {
url: this.inspector,
});

if (this.transport.type === 'sse') {
return this.onInitSSE(this.httpPlugin, this.transport);
switch (this.transport.type) {
case 'sse':
return this.onInitSSE(this.httpPlugin, this.transport);
case 'streamable-http':
return this.onInitStreamableHTTP(this.httpPlugin, this.transport);
case 'stdio':
return this.onInitStdio(this.transport);
}

return this.onInitStdio(this.transport);
}

onStart({ port }: IPluginStartEvent) {
if (this.transport.type === 'sse') {
this.logger.info(
`listening at http://localhost:${port}${this.transport.path || '/mcp'}`,
);
} else {
this.logger.info('listening on stdin');
switch (this.transport.type) {
case 'sse':
this.logger.info(
`listening at http://localhost:${port}${this.transport.path || '/mcp'} (SSE)`,
);
break;
case 'streamable-http':
this.logger.info(
`listening at http://localhost:${port}${this.transport.path || '/mcp'} (Streamable HTTP)`,
);
break;
case 'stdio':
this.logger.info('listening on stdin');
break;
}
}

Expand Down Expand Up @@ -251,6 +294,94 @@ export class McpPlugin implements IPlugin {
});
}

protected onInitStreamableHTTP(
http: HttpPlugin,
options: McpStreamableHTTPTransportOptions
) {
const path = options.path || '/mcp';
const stateful = options.stateful !== false; // default to true

// POST handler - main request handler for MCP messages
http.post(path, async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

// Check for existing session
if (sessionId && this.httpSessions.has(sessionId)) {
const transport = this.httpSessions.get(sessionId)!;
await transport.handleRequest(req, res);
return;
}

// Create new session for stateful mode or handle stateless request
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: stateful ? undefined : () => undefined,
});

// For stateful sessions, store the transport
if (stateful) {
transport.onclose = () => {
const sid = transport.sessionId;
if (sid) {
this.httpSessions.delete(sid);
this.logger.debug(`Session ${sid} closed`);
}
};
}

// Connect to the MCP server
await this.server.connect(transport);

// Handle the initial request
await transport.handleRequest(req, res);

// Store session after handling first request (sessionId is set)
if (stateful && transport.sessionId) {
this.httpSessions.set(transport.sessionId, transport);
this.logger.debug(`Session ${transport.sessionId} created`);
}
});

// GET handler - for SSE stream (reconnection in stateful mode)
http.get(path, async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

if (!sessionId || !this.httpSessions.has(sessionId)) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}

const transport = this.httpSessions.get(sessionId)!;
await transport.handleRequest(req, res);
});

// DELETE handler - for session termination
http.delete(path, async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

if (!sessionId || !this.httpSessions.has(sessionId)) {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}

const transport = this.httpSessions.get(sessionId)!;
await transport.handleRequest(req, res);
});
}

protected onToolCall(name: string, prompt: IChatPrompt) {
return async (args: any): Promise<CallToolResult> => {
try {
Expand Down
10 changes: 6 additions & 4 deletions packages/ai/src/local-memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ export class LocalMemory implements IMemory {
}

while (
len > (this.options.max || 100) ||
(this.messages[0].role === 'model' && this.messages[0].function_calls?.length) ||
this.messages[0].role === 'function'
len > 0 &&
(len > (this.options.max || 100) ||
(this.messages[0]?.role === 'model' && this.messages[0]?.function_calls?.length) ||
this.messages[0]?.role === 'function')
) {
const removed = this.pop();

Expand Down Expand Up @@ -87,8 +88,9 @@ export class LocalMemory implements IMemory {

let last = this.messages[end];

while ((last.role === 'model' && last.function_calls?.length) || last.role === 'function') {
while (last && ((last.role === 'model' && last.function_calls?.length) || last.role === 'function')) {
end++;
if (end >= this.messages.length) break;
last = this.messages[end];
}

Expand Down