diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml index 9654c1cb6..24e2f041f 100644 --- a/.github/workflows/maven-ci.yml +++ b/.github/workflows/maven-ci.yml @@ -48,10 +48,8 @@ jobs: include: - os: ubuntu-latest repo-cache-path: '~/.m2/repository' - mvnd-cache-path: '~/.mvnd' - os: windows-latest repo-cache-path: '~\.m2\repository' - mvnd-cache-path: '~\.mvnd' env: MVND_VERSION: '1.0.3' @@ -74,33 +72,21 @@ jobs: restore-keys: | maven-${{ runner.os }}- - - name: Cache mvnd [${{ runner.os }}] - id: cache-mvnd + - name: Cache mvnd [Linux] uses: actions/cache@v4 + if: runner.os == 'Linux' with: - path: ${{ matrix.mvnd-cache-path }} - key: mvnd-${{ env.MVND_VERSION }}-${{ runner.os }} + path: ~/.mvnd + key: mvnd-${{ env.MVND_VERSION }}-linux - name: Install mvnd [Linux] - if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Linux' + if: runner.os == 'Linux' shell: bash run: | curl -fsSL https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64.tar.gz | tar xz mkdir -p ~/.mvnd mv maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64/* ~/.mvnd/ - - name: Install mvnd [Windows] - if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Windows' - shell: pwsh - run: | - Invoke-WebRequest ` - -Uri "https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64.zip" ` - -OutFile "mvnd.zip" - mkdir -Path "~\.mvnd" -Force - Expand-Archive -Path "mvnd.zip" -DestinationPath "mvnd" -FORCE - Copy-Item -Path "mvnd\maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64\*" -Destination "~\.mvnd\" -Recurse - Remove-Item -Path "mvnd", "mvnd.zip" -Recurse -Force - - name: Build and Test with Coverage [Linux] if: runner.os == 'Linux' run: | @@ -109,8 +95,9 @@ jobs: - name: Build and Test with Coverage [Windows] if: runner.os == 'Windows' + shell: pwsh run: | - & "~\.mvnd\bin\mvnd.cmd" -B clean verify + mvn -B clean verify - name: Upload coverage reports to Codecov if: runner.os == 'Linux' diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index f67968a20..5c5a94087 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -19,17 +19,19 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** - * Wrapper for asynchronous MCP clients using Project Reactor. - * This implementation delegates to {@link McpAsyncClient} and provides - * reactive operations that return Mono types. + * Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to + * {@link McpAsyncClient} and provides reactive operations that return Mono types. * - *
Example usage: - *
{@code
+ * Example usage:
{@code
* McpAsyncClient client = ... // created via McpClient.async()
* McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
* wrapper.initialize()
@@ -42,6 +44,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);
private final McpAsyncClient client;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Constructs a new asynchronous MCP client wrapper.
@@ -51,7 +54,36 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
- this.client = client;
+ this.client = Objects.requireNonNull(client, "MCP client cannot be null");
+ }
+
+ /**
+ * Package-private constructor for use by McpClientBuilder.
+ *
+ * @param name unique identifier for this client
+ * @param builder the builder to construct the client with notification handlers
+ */
+ McpAsyncClientWrapper(String name, McpClientBuilder builder) {
+ super(name);
+ this.client = builder.buildClientForAsync(this);
+ }
+
+ /**
+ * Updates the cached tools map with new tools from the server. This method is called when the
+ * server sends a tools/list_changed notification. This method is thread-safe and can be
+ * called concurrently from notification handlers.
+ *
+ * @param tools the new list of tools from the server (empty list clears cache)
+ */
+ void updateCachedTools(List tools) {
+ if (closed.get() || tools == null) {
+ return;
+ }
+ // Build new map first, then atomically replace via volatile assignment
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
/**
@@ -64,6 +96,9 @@ public McpAsyncClientWrapper(String name, McpAsyncClient client) {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (initialized) {
return Mono.empty();
}
@@ -84,10 +119,19 @@ public Mono initialize() {
"MCP client '{}' discovered {} tools",
name,
result.tools().size());
- // Cache all tools
- result.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache all tools - build new map then atomically replace
+ Map newTools =
+ result.tools().stream()
+ .collect(
+ Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
+ })
+ .doOnSuccess(
+ v -> {
+ if (!closed.get()) {
+ initialized = true;
+ }
})
- .doOnSuccess(v -> initialized = true)
.doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e))
.then();
}
@@ -99,10 +143,12 @@ public Mono initialize() {
* initialized before calling this method.
*
* @return a Mono emitting the list of available tools
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
@@ -120,10 +166,12 @@ public Mono> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
@@ -161,7 +209,7 @@ public Mono callTool(String toolName, Map();
}
}
- initialized = false;
- cachedTools.clear();
}
}
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index 10a3ab489..2b50b7b1d 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -38,6 +38,8 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
/**
@@ -78,6 +80,7 @@ public class McpClientBuilder {
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120);
private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30);
+ private static final Logger logger = LoggerFactory.getLogger(McpClientBuilder.class);
private final String name;
private TransportConfig transportConfig;
@@ -283,6 +286,9 @@ public McpClientBuilder initializationTimeout(Duration timeout) {
/**
* Builds an asynchronous MCP client wrapper.
*
+ * This method uses a constructor that passes the builder to the wrapper, allowing the
+ * wrapper to construct its own client with notification handlers.
+ *
* @return Mono emitting the async client wrapper
*/
public Mono buildAsync() {
@@ -290,34 +296,15 @@ public Mono buildAsync() {
return Mono.error(new IllegalStateException("Transport must be configured"));
}
- return Mono.fromCallable(
- () -> {
- McpClientTransport transport = transportConfig.createTransport();
-
- McpSchema.Implementation clientInfo =
- new McpSchema.Implementation(
- "agentscope-java",
- "AgentScope Java Framework",
- "1.0.9-SNAPSHOT");
-
- McpSchema.ClientCapabilities clientCapabilities =
- McpSchema.ClientCapabilities.builder().build();
-
- McpAsyncClient mcpClient =
- McpClient.async(transport)
- .requestTimeout(requestTimeout)
- .initializationTimeout(initializationTimeout)
- .clientInfo(clientInfo)
- .capabilities(clientCapabilities)
- .build();
-
- return new McpAsyncClientWrapper(name, mcpClient);
- });
+ return Mono.fromCallable(() -> new McpAsyncClientWrapper(name, this));
}
/**
* Builds a synchronous MCP client wrapper (blocking operations).
*
+ * This method uses a constructor that passes the builder to the wrapper, allowing the
+ * wrapper to construct its own client with notification handlers.
+ *
* @return synchronous client wrapper
*/
public McpClientWrapper buildSync() {
@@ -325,24 +312,116 @@ public McpClientWrapper buildSync() {
throw new IllegalStateException("Transport must be configured");
}
+ return new McpSyncClientWrapper(name, this);
+ }
+
+ // ==================== Internal Client Building Methods ====================
+
+ /**
+ * Builds an MCP async client with notification handlers for the specified wrapper.
+ *
+ * @param wrapper the wrapper that will receive notification callbacks
+ * @return the built MCP async client
+ */
+ McpAsyncClient buildClientForAsync(McpAsyncClientWrapper wrapper) {
+ if (transportConfig == null) {
+ throw new IllegalStateException("Transport must be configured");
+ }
+
+ McpClientTransport transport = transportConfig.createTransport();
+
+ McpSchema.Implementation clientInfo =
+ new McpSchema.Implementation(
+ "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT");
+
+ McpSchema.ClientCapabilities clientCapabilities =
+ McpSchema.ClientCapabilities.builder().build();
+
+ return McpClient.async(transport)
+ .requestTimeout(requestTimeout)
+ .initializationTimeout(initializationTimeout)
+ .clientInfo(clientInfo)
+ .capabilities(clientCapabilities)
+ .loggingConsumer(buildAsyncLoggingConsumer())
+ .toolsChangeConsumer(
+ tools -> {
+ wrapper.updateCachedTools(tools);
+ return Mono.empty();
+ })
+ .build();
+ }
+
+ /**
+ * Builds an MCP sync client with notification handlers for the specified wrapper.
+ *
+ * @param wrapper the wrapper that will receive notification callbacks
+ * @return the built MCP sync client
+ */
+ McpSyncClient buildClientForSync(McpSyncClientWrapper wrapper) {
+ if (transportConfig == null) {
+ throw new IllegalStateException("Transport must be configured");
+ }
+
McpClientTransport transport = transportConfig.createTransport();
McpSchema.Implementation clientInfo =
new McpSchema.Implementation(
- "agentscope-java", "AgentScope Java Framework", "1.0.9-SNAPSHOT");
+ "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT");
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();
- McpSyncClient mcpClient =
- McpClient.sync(transport)
- .requestTimeout(requestTimeout)
- .initializationTimeout(initializationTimeout)
- .clientInfo(clientInfo)
- .capabilities(clientCapabilities)
- .build();
+ return McpClient.sync(transport)
+ .requestTimeout(requestTimeout)
+ .initializationTimeout(initializationTimeout)
+ .clientInfo(clientInfo)
+ .capabilities(clientCapabilities)
+ .loggingConsumer(buildSyncLoggingConsumer())
+ .toolsChangeConsumer(tools -> wrapper.updateCachedTools(tools))
+ .build();
+ }
+
+ /**
+ * Creates a logging function for async clients that maps MCP log notifications to SLF4J.
+ *
+ * @return the logging function
+ */
+ private java.util.function.Function<
+ io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification, Mono>
+ buildAsyncLoggingConsumer() {
+ return notification -> {
+ logNotification(notification);
+ return Mono.empty();
+ };
+ }
+
+ /**
+ * Creates a logging consumer for sync clients that maps MCP log notifications to SLF4J.
+ *
+ * @return the logging consumer
+ */
+ private Consumer
+ buildSyncLoggingConsumer() {
+ return notification -> logNotification(notification);
+ }
- return new McpSyncClientWrapper(name, mcpClient);
+ /**
+ * Logs an MCP notification to SLF4J at the appropriate level.
+ *
+ * @param notification the notification to log
+ */
+ private void logNotification(
+ io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification notification) {
+ String level = notification.level() != null ? notification.level().toString() : "info";
+ String loggerName = notification.logger() != null ? notification.logger() : "mcp";
+ String data = notification.data() != null ? notification.data() : "";
+
+ switch (level.toLowerCase()) {
+ case "error" -> logger.error("[MCP-{}] [{}] {}", name, loggerName, data);
+ case "warning" -> logger.warn("[MCP-{}] [{}] {}", name, loggerName, data);
+ case "debug" -> logger.debug("[MCP-{}] [{}] {}", name, loggerName, data);
+ default -> logger.info("[MCP-{}] [{}] {}", name, loggerName, data);
+ }
}
// ==================== Internal Transport Configuration Classes ====================
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
index 473af292f..f65286adc 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
@@ -43,7 +43,7 @@ public abstract class McpClientWrapper implements AutoCloseable {
protected final String name;
/** Cache of tools available from this MCP server */
- protected final Map cachedTools;
+ protected volatile Map cachedTools;
/** Flag indicating whether the client has been initialized */
protected volatile boolean initialized = false;
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index aaf16f589..952da75ca 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -19,18 +19,21 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
- * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types.
- * This implementation delegates to {@link McpSyncClient} and wraps blocking operations
- * in Reactor's boundedElastic scheduler to avoid blocking the event loop.
+ * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. This
+ * implementation delegates to {@link McpSyncClient} and wraps blocking operations in Reactor's
+ * boundedElastic scheduler to avoid blocking the event loop.
*
- * Example usage:
- *
{@code
+ * Example usage:
{@code
* McpSyncClient client = ... // created via McpClient.sync()
* McpSyncClientWrapper wrapper = new McpSyncClientWrapper("my-mcp", client);
* wrapper.initialize()
@@ -43,6 +46,7 @@ public class McpSyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class);
private final McpSyncClient client;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Constructs a new synchronous MCP client wrapper.
@@ -52,7 +56,36 @@ public class McpSyncClientWrapper extends McpClientWrapper {
*/
public McpSyncClientWrapper(String name, McpSyncClient client) {
super(name);
- this.client = client;
+ this.client = Objects.requireNonNull(client, "MCP client cannot be null");
+ }
+
+ /**
+ * Package-private constructor for use by McpClientBuilder.
+ *
+ * @param name unique identifier for this client
+ * @param builder the builder to construct the client with notification handlers
+ */
+ McpSyncClientWrapper(String name, McpClientBuilder builder) {
+ super(name);
+ this.client = builder.buildClientForSync(this);
+ }
+
+ /**
+ * Updates the cached tools map with new tools from the server. This method is called when the
+ * server sends a tools/list_changed notification. This method is thread-safe and can be
+ * called concurrently from notification handlers.
+ *
+ * @param tools the new list of tools from the server (empty list clears cache)
+ */
+ void updateCachedTools(List tools) {
+ if (closed.get() || tools == null) {
+ return;
+ }
+ // Build new map first, then atomically replace via volatile assignment
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
/**
@@ -66,14 +99,17 @@ public McpSyncClientWrapper(String name, McpSyncClient client) {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (initialized) {
return Mono.empty();
}
- logger.info("Initializing MCP sync client: {}", name);
-
return Mono.fromCallable(
() -> {
+ logger.info("Initializing MCP sync client: {}", name);
+
// Initialize the client (blocking)
McpSchema.InitializeResult result = client.initialize();
logger.debug(
@@ -88,9 +124,16 @@ public Mono initialize() {
name,
toolsResult.tools().size());
- toolsResult.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache all tools - build new map then atomically replace
+ Map newTools =
+ toolsResult.tools().stream()
+ .collect(
+ Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
- initialized = true;
+ if (!closed.get()) {
+ initialized = true;
+ }
return null;
})
.subscribeOn(Schedulers.boundedElastic())
@@ -105,10 +148,12 @@ public Mono initialize() {
* must be initialized before calling this method.
*
* @return a Mono emitting the list of available tools
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
@@ -127,10 +172,12 @@ public Mono> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
@@ -172,7 +219,7 @@ public Mono callTool(String toolName, Map();
}
}
- initialized = false;
- cachedTools.clear();
}
}
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
index c65b580ec..0a86b3c1d 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
@@ -203,6 +203,198 @@ void testClose_Success() {
verify(mockClient, times(1)).closeGracefully();
}
+ // ==================== updateCachedTools Tests ====================
+
+ @Test
+ void testUpdateCachedTools_AddsNewTools() {
+ // Initially cache is empty
+ assertTrue(wrapper.cachedTools.isEmpty());
+
+ // Create mock tools
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "new-tool-1",
+ null,
+ "New tool 1",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "new-tool-2",
+ null,
+ "New tool 2",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ // Call updateCachedTools (package-private, use reflection)
+ invokeUpdateCachedTools(List.of(tool1, tool2));
+
+ // Verify cache was updated
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("new-tool-1"));
+ assertNotNull(wrapper.getCachedTool("new-tool-2"));
+ assertEquals("New tool 1", wrapper.getCachedTool("new-tool-1").description());
+ assertEquals("New tool 2", wrapper.getCachedTool("new-tool-2").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_EmptyListClearsCache() {
+ // First add some tools to cache
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Then simulate server sending empty tool list (all tools removed)
+ invokeUpdateCachedTools(List.of());
+
+ // Verify cache was cleared
+ assertTrue(wrapper.cachedTools.isEmpty());
+ assertNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_NullDoesNothing() {
+ // Add some tools to cache first
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ int initialSize = wrapper.cachedTools.size();
+
+ // Call with null should not modify cache
+ invokeUpdateCachedTools(null);
+
+ // Verify cache unchanged
+ assertEquals(initialSize, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_ReplacesExistingTools() {
+ // Initialize with existing tools
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Simulate server sending updated tool list
+ McpSchema.Tool updatedTool =
+ new McpSchema.Tool(
+ "tool1",
+ null,
+ "Updated description",
+ new McpSchema.JsonSchema("string", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool newTool =
+ new McpSchema.Tool(
+ "new-tool",
+ null,
+ "Brand new tool",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ invokeUpdateCachedTools(List.of(updatedTool, newTool));
+
+ // Verify cache was updated (not appended)
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ assertEquals("Updated description", wrapper.getCachedTool("tool1").description());
+ assertNotNull(wrapper.getCachedTool("new-tool"));
+ assertEquals("Brand new tool", wrapper.getCachedTool("new-tool").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_MultipleTimes() {
+ // First update
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "tool-a",
+ null,
+ "Tool A",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool1));
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Second update - replace
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "tool-b",
+ null,
+ "Tool B",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool2));
+
+ // Verify only latest tools exist
+ assertEquals(1, wrapper.cachedTools.size());
+ assertNull(wrapper.getCachedTool("tool-a"));
+ assertNotNull(wrapper.getCachedTool("tool-b"));
+ }
+
+ // ==================== close() Idempotency Tests ====================
+
+ @Test
+ void testClose_Idempotent() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertTrue(wrapper.isInitialized());
+
+ when(mockClient.closeGracefully()).thenReturn(Mono.empty());
+
+ // Close multiple times
+ wrapper.close();
+ wrapper.close();
+ wrapper.close();
+
+ // Verify closeGracefully was called only once
+ verify(mockClient, times(1)).closeGracefully();
+ assertFalse(wrapper.isInitialized());
+ }
+
+ @Test
+ void testClose_WithoutInitialize() {
+ // Close without ever initializing
+ when(mockClient.closeGracefully()).thenReturn(Mono.empty());
+
+ wrapper.close();
+
+ // Should not throw, client should be nullified
+ assertFalse(wrapper.isInitialized());
+ assertTrue(wrapper.cachedTools.isEmpty());
+ }
+
+ // ==================== Null Client Error Path Tests ====================
+
+ // Removed: client is now final and cannot be null after construction
+
+ // ==================== Helper Methods ====================
+
+ /**
+ * Invokes the package-private updateCachedTools method using reflection.
+ *
+ * @param tools the list of tools to update
+ */
+ private void invokeUpdateCachedTools(List tools) {
+ try {
+ java.lang.reflect.Method method =
+ McpAsyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, tools);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke updateCachedTools", e);
+ }
+ }
+
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT");
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
index 674904a99..117c3cdb9 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
@@ -282,15 +282,6 @@ void testClose_GracefulCloseFails() {
verify(mockClient, times(1)).close();
}
- @Test
- void testClose_NullClient() {
- McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null);
-
- nullWrapper.close();
-
- assertFalse(nullWrapper.isInitialized());
- }
-
@Test
void testClose_MultipleCallsSafe() {
setupSuccessfulInitialization();
@@ -323,6 +314,194 @@ void testCallTool_WithNullArguments() {
assertFalse(Boolean.TRUE.equals(result.isError()));
}
+ // ==================== updateCachedTools Tests ====================
+
+ @Test
+ void testUpdateCachedTools_AddsNewTools() {
+ // Initially cache is empty
+ assertTrue(wrapper.cachedTools.isEmpty());
+
+ // Create mock tools
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "new-tool-1",
+ null,
+ "New tool 1",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "new-tool-2",
+ null,
+ "New tool 2",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ // Call updateCachedTools (package-private, use reflection)
+ invokeUpdateCachedTools(List.of(tool1, tool2));
+
+ // Verify cache was updated
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("new-tool-1"));
+ assertNotNull(wrapper.getCachedTool("new-tool-2"));
+ assertEquals("New tool 1", wrapper.getCachedTool("new-tool-1").description());
+ assertEquals("New tool 2", wrapper.getCachedTool("new-tool-2").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_EmptyListClearsCache() {
+ // First add some tools to cache
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Then simulate server sending empty tool list (all tools removed)
+ invokeUpdateCachedTools(List.of());
+
+ // Verify cache was cleared
+ assertTrue(wrapper.cachedTools.isEmpty());
+ assertNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_NullDoesNothing() {
+ // Add some tools to cache first
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ int initialSize = wrapper.cachedTools.size();
+
+ // Call with null should not modify cache
+ invokeUpdateCachedTools(null);
+
+ // Verify cache unchanged
+ assertEquals(initialSize, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_ReplacesExistingTools() {
+ // Initialize with existing tools
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Simulate server sending updated tool list
+ McpSchema.Tool updatedTool =
+ new McpSchema.Tool(
+ "tool1",
+ null,
+ "Updated description",
+ new McpSchema.JsonSchema("string", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool newTool =
+ new McpSchema.Tool(
+ "new-tool",
+ null,
+ "Brand new tool",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ invokeUpdateCachedTools(List.of(updatedTool, newTool));
+
+ // Verify cache was updated (not appended)
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ assertEquals("Updated description", wrapper.getCachedTool("tool1").description());
+ assertNotNull(wrapper.getCachedTool("new-tool"));
+ assertEquals("Brand new tool", wrapper.getCachedTool("new-tool").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_MultipleTimes() {
+ // First update
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "tool-a",
+ null,
+ "Tool A",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool1));
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Second update - replace
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "tool-b",
+ null,
+ "Tool B",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool2));
+
+ // Verify only latest tools exist
+ assertEquals(1, wrapper.cachedTools.size());
+ assertNull(wrapper.getCachedTool("tool-a"));
+ assertNotNull(wrapper.getCachedTool("tool-b"));
+ }
+
+ // ==================== close() Idempotency Tests ====================
+
+ @Test
+ void testClose_Idempotent() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertTrue(wrapper.isInitialized());
+
+ // Close multiple times
+ wrapper.close();
+ wrapper.close();
+ wrapper.close();
+
+ // Verify closeGracefully was called only once
+ verify(mockClient, times(1)).closeGracefully();
+ assertFalse(wrapper.isInitialized());
+ }
+
+ @Test
+ void testClose_WithoutInitialize() {
+ // Close without ever initializing
+ wrapper.close();
+
+ // Should not throw, client should be nullified
+ assertFalse(wrapper.isInitialized());
+ assertTrue(wrapper.cachedTools.isEmpty());
+ }
+
+ // ==================== Null Client Error Path Tests ====================
+
+ // Removed: client is now final and cannot be null after construction
+
+ // ==================== Helper Methods ====================
+
+ /**
+ * Invokes the package-private updateCachedTools method using reflection.
+ *
+ * @param tools the list of tools to update
+ */
+ private void invokeUpdateCachedTools(List tools) {
+ try {
+ java.lang.reflect.Method method =
+ McpSyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, tools);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke updateCachedTools", e);
+ }
+ }
+
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT");
diff --git a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
index ad289c628..d27038389 100644
--- a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
+++ b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
@@ -20,6 +20,9 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
@@ -68,6 +71,11 @@ public class HigressMcpClientWrapper extends McpClientWrapper {
*/
private final McpClientWrapper delegateClient;
+ /**
+ * Flag indicating whether this client has been closed.
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
/**
* Whether x_higress_tool_search is enabled.
*/
@@ -115,6 +123,10 @@ public class HigressMcpClientWrapper extends McpClientWrapper {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
if (isInitialized()) {
logger.debug("Higress MCP client '{}' already initialized", name);
return Mono.empty();
@@ -126,7 +138,9 @@ public Mono initialize() {
.initialize()
.doOnSuccess(
unused -> {
- this.initialized = true;
+ if (!closed.get()) {
+ this.initialized = true;
+ }
logger.info("Higress MCP client '{}' initialized successfully", name);
})
.doOnError(
@@ -154,6 +168,10 @@ public Mono initialize() {
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
if (enableToolSearch) {
// Call x_higress_tool_search and convert results to McpSchema.Tool
logger.info(
@@ -185,8 +203,13 @@ public Mono> listTools() {
})
.doOnNext(
tools -> {
- // Cache tools locally
- tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache tools locally - build new map then atomically replace
+ Map newTools =
+ tools.stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
});
} else {
// Return all tools from delegate
@@ -194,8 +217,13 @@ public Mono> listTools() {
.listTools()
.doOnNext(
tools -> {
- // Cache tools locally
- tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache tools locally - build new map then atomically replace
+ Map newTools =
+ tools.stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
logger.debug(
"Higress MCP client '{}' discovered {} tools",
name,
@@ -259,6 +287,10 @@ private McpSchema.Tool convertToMcpTool(HigressToolSearchResult.ToolInfo toolInf
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
logger.debug(
"Calling tool '{}' on Higress MCP client '{}' with arguments: {}",
toolName,
@@ -293,19 +325,23 @@ public Mono callTool(String toolName, Map();
}
-
- this.initialized = false;
- this.cachedTools.clear();
}
/**