From 3ef7d5d5d5e110d86264b7e5f187243ecaa5357d Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Thu, 15 Jan 2026 09:45:14 +0800 Subject: [PATCH 1/9] feat(mcp): add notification handlers support for async client --- .../core/tool/mcp/McpAsyncClientWrapper.java | 63 ++++- .../core/tool/mcp/McpClientBuilder.java | 70 ++++- .../core/tool/mcp/McpSyncClientWrapper.java | 43 ++- .../tool/mcp/McpAsyncClientWrapperTest.java | 260 ++++++++++++++++++ 4 files changed, 409 insertions(+), 27 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index f67968a20..36361cade 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -19,17 +19,16 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** - * Wrapper for asynchronous MCP clients using Project Reactor. - * This implementation delegates to {@link McpAsyncClient} and provides - * reactive operations that return Mono types. + * Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to + * {@link McpAsyncClient} and provides reactive operations that return Mono types. * - *

Example usage: - *

{@code
+ * 

Example usage:

{@code
  * McpAsyncClient client = ... // created via McpClient.async()
  * McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
  * wrapper.initialize()
@@ -41,7 +40,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
 
     private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);
 
-    private final McpAsyncClient client;
+    private final AtomicReference clientRef;
 
     /**
      * Constructs a new asynchronous MCP client wrapper.
@@ -51,7 +50,32 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
      */
     public McpAsyncClientWrapper(String name, McpAsyncClient client) {
         super(name);
-        this.client = client;
+        this.clientRef = new AtomicReference<>(client);
+    }
+
+    /**
+     * Sets the underlying MCP async client. This is called by McpClientBuilder after the client
+     * is created with notification handlers.
+     *
+     * @param client the MCP async client
+     */
+    void setClient(McpAsyncClient client) {
+        this.clientRef.set(client);
+    }
+
+    /**
+     * Updates the cached tools map with new tools from the server. This method is called when the
+     * server sends a tools/list_changed notification.
+     *
+     * @param tools the new list of tools from the server (empty list clears cache)
+     */
+    void updateCachedTools(List tools) {
+        if (tools != null) {
+            // Clear and rebuild cache
+            cachedTools.clear();
+            tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+            logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
+        }
     }
 
     /**
@@ -68,6 +92,12 @@ public Mono initialize() {
             return Mono.empty();
         }
 
+        McpAsyncClient client = clientRef.get();
+        if (client == null) {
+            return Mono.error(
+                    new IllegalStateException("McpAsyncClient not set. Call setClient() first."));
+        }
+
         logger.info("Initializing MCP async client: {}", name);
 
         return client.initialize()
@@ -99,7 +129,6 @@ public Mono initialize() {
      * initialized before calling this method.
      *
      * @return a Mono emitting the list of available tools
-     * @throws IllegalStateException if the client is not initialized
      */
     @Override
     public Mono> listTools() {
@@ -108,6 +137,11 @@ public Mono> listTools() {
                     new IllegalStateException("MCP client '" + name + "' not initialized"));
         }
 
+        McpAsyncClient client = clientRef.get();
+        if (client == null) {
+            return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
+        }
+
         return client.listTools().map(McpSchema.ListToolsResult::tools);
     }
 
@@ -120,7 +154,6 @@ public Mono> listTools() {
      * @param toolName the name of the tool to call
      * @param arguments the arguments to pass to the tool
      * @return a Mono emitting the tool call result (may contain error information)
-     * @throws IllegalStateException if the client is not initialized
      */
     @Override
     public Mono callTool(String toolName, Map arguments) {
@@ -129,6 +162,11 @@ public Mono callTool(String toolName, Map callTool(String toolName, Map logger.debug("MCP client '{}' closed", name))
                         .doOnError(e -> logger.error("Error closing MCP client '{}'", name, e))
                         .block();
             } catch (Exception e) {
                 logger.error("Exception during MCP client close", e);
-                client.close();
+                toClose.close();
             }
         }
         initialized = false;
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index 67097097b..de03eb95d 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -36,6 +36,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
 /**
@@ -76,6 +78,7 @@ public class McpClientBuilder {
 
     private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120);
     private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30);
+    private static final Logger logger = LoggerFactory.getLogger(McpClientBuilder.class);
 
     private final String name;
     private TransportConfig transportConfig;
@@ -233,6 +236,10 @@ public McpClientBuilder initializationTimeout(Duration timeout) {
     /**
      * Builds an asynchronous MCP client wrapper.
      *
+     * 

This method uses a two-phase build pattern to support notification handlers. + * The wrapper is created first, then the MCP client is built with notification consumers + * that can reference the wrapper. + * * @return Mono emitting the async client wrapper */ public Mono buildAsync() { @@ -253,15 +260,76 @@ public Mono buildAsync() { McpSchema.ClientCapabilities clientCapabilities = McpSchema.ClientCapabilities.builder().build(); + // ========== Phase 1: Create wrapper (client is temporarily null) ========== + McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null); + + // ========== Phase 2: Build client (can reference wrapper) ========== McpAsyncClient mcpClient = McpClient.async(transport) .requestTimeout(requestTimeout) .initializationTimeout(initializationTimeout) .clientInfo(clientInfo) .capabilities(clientCapabilities) + + // ----- Log notification Consumer ----- + .loggingConsumer( + notification -> { + // Parse notification content + String level = + notification.level() != null + ? notification.level().toString() + : "info"; + String loggerName = + notification.logger() != null + ? notification.logger() + : "mcp"; + String data = + notification.data() != null + ? notification.data() + : ""; + + // Log to SLF4J by level + switch (level.toLowerCase()) { + case "error" -> + logger.error( + "[MCP-{}] [{}] {}", + name, + loggerName, + data); + case "warning" -> + logger.warn( + "[MCP-{}] [{}] {}", + name, + loggerName, + data); + case "debug" -> + logger.debug( + "[MCP-{}] [{}] {}", + name, + loggerName, + data); + default -> + logger.info( + "[MCP-{}] [{}] {}", + name, + loggerName, + data); + } + return Mono.empty(); + }) + + // ----- Tools change notification Consumer ----- + .toolsChangeConsumer( + tools -> { + // Call wrapper method to update cache + wrapper.updateCachedTools(tools); + return Mono.empty(); + }) .build(); - return new McpAsyncClientWrapper(name, mcpClient); + // ========== Phase 3: Link MCP client to wrapper ========== + wrapper.setClient(mcpClient); + return wrapper; }); } diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index aaf16f589..b9d75cf8b 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -19,18 +19,18 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** - * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. - * This implementation delegates to {@link McpSyncClient} and wraps blocking operations - * in Reactor's boundedElastic scheduler to avoid blocking the event loop. + * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. This + * implementation delegates to {@link McpSyncClient} and wraps blocking operations in Reactor's + * boundedElastic scheduler to avoid blocking the event loop. * - *

Example usage: - *

{@code
+ * 

Example usage:

{@code
  * McpSyncClient client = ... // created via McpClient.sync()
  * McpSyncClientWrapper wrapper = new McpSyncClientWrapper("my-mcp", client);
  * wrapper.initialize()
@@ -42,7 +42,7 @@ public class McpSyncClientWrapper extends McpClientWrapper {
 
     private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class);
 
-    private final McpSyncClient client;
+    private final AtomicReference clientRef;
 
     /**
      * Constructs a new synchronous MCP client wrapper.
@@ -52,7 +52,7 @@ public class McpSyncClientWrapper extends McpClientWrapper {
      */
     public McpSyncClientWrapper(String name, McpSyncClient client) {
         super(name);
-        this.client = client;
+        this.clientRef = new AtomicReference<>(client);
     }
 
     /**
@@ -70,10 +70,16 @@ public Mono initialize() {
             return Mono.empty();
         }
 
-        logger.info("Initializing MCP sync client: {}", name);
-
         return Mono.fromCallable(
                         () -> {
+                            McpSyncClient client = clientRef.get();
+                            if (client == null) {
+                                throw new IllegalStateException(
+                                        "McpSyncClient not set. Call setClient() first.");
+                            }
+
+                            logger.info("Initializing MCP sync client: {}", name);
+
                             // Initialize the client (blocking)
                             McpSchema.InitializeResult result = client.initialize();
                             logger.debug(
@@ -105,7 +111,6 @@ public Mono initialize() {
      * must be initialized before calling this method.
      *
      * @return a Mono emitting the list of available tools
-     * @throws IllegalStateException if the client is not initialized
      */
     @Override
     public Mono> listTools() {
@@ -114,6 +119,11 @@ public Mono> listTools() {
                     new IllegalStateException("MCP client '" + name + "' not initialized"));
         }
 
+        McpSyncClient client = clientRef.get();
+        if (client == null) {
+            return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
+        }
+
         return Mono.fromCallable(() -> client.listTools().tools())
                 .subscribeOn(Schedulers.boundedElastic());
     }
@@ -127,7 +137,6 @@ public Mono> listTools() {
      * @param toolName the name of the tool to call
      * @param arguments the arguments to pass to the tool
      * @return a Mono emitting the tool call result (may contain error information)
-     * @throws IllegalStateException if the client is not initialized
      */
     @Override
     public Mono callTool(String toolName, Map arguments) {
@@ -136,6 +145,11 @@ public Mono callTool(String toolName, Map callTool(String toolName, Map tools) {
+        try {
+            java.lang.reflect.Method method =
+                    McpAsyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class);
+            method.setAccessible(true);
+            method.invoke(wrapper, tools);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to invoke updateCachedTools", e);
+        }
+    }
+
+    /**
+     * Invokes the package-private setClient method using reflection.
+     *
+     * @param client the MCP async client to set
+     */
+    private void invokeSetClient(McpAsyncClient client) {
+        try {
+            java.lang.reflect.Method method =
+                    McpAsyncClientWrapper.class.getDeclaredMethod(
+                            "setClient", McpAsyncClient.class);
+            method.setAccessible(true);
+            method.invoke(wrapper, client);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to invoke setClient", e);
+        }
+    }
+
     private void setupSuccessfulInitialization() {
         McpSchema.Implementation serverInfo =
                 new McpSchema.Implementation("TestServer", "Test Server", "1.0.8-SNAPSHOT");

From ae062230b1a71eebda34f5182c90aa143bd100a3 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Thu, 15 Jan 2026 10:13:37 +0800
Subject: [PATCH 2/9] feat(mcp): add notification handlers support for sync
 client

---
 .../core/tool/mcp/McpAsyncClientWrapper.java  |  10 +-
 .../core/tool/mcp/McpClientBuilder.java       |  50 +++-
 .../core/tool/mcp/McpSyncClientWrapper.java   |  31 ++-
 .../tool/mcp/McpSyncClientWrapperTest.java    | 254 ++++++++++++++++++
 4 files changed, 339 insertions(+), 6 deletions(-)

diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index 36361cade..5f15fd155 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
@@ -71,9 +72,11 @@ void setClient(McpAsyncClient client) {
      */
     void updateCachedTools(List tools) {
         if (tools != null) {
-            // Clear and rebuild cache
+            // Build new map first, then atomically replace
+            Map newTools =
+                    tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
             cachedTools.clear();
-            tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+            cachedTools.putAll(newTools);
             logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
         }
     }
@@ -94,8 +97,7 @@ public Mono initialize() {
 
         McpAsyncClient client = clientRef.get();
         if (client == null) {
-            return Mono.error(
-                    new IllegalStateException("McpAsyncClient not set. Call setClient() first."));
+            return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
         }
 
         logger.info("Initializing MCP async client: {}", name);
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index d89b1821a..93238c0fd 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -386,6 +386,10 @@ public Mono buildAsync() {
     /**
      * Builds a synchronous MCP client wrapper (blocking operations).
      *
+     * 

This method uses a two-phase build pattern to support notification handlers. The wrapper + * is created first, then the MCP client is built with notification consumers that can + * reference the wrapper. + * * @return synchronous client wrapper */ public McpClientWrapper buildSync() { @@ -402,15 +406,59 @@ public McpClientWrapper buildSync() { McpSchema.ClientCapabilities clientCapabilities = McpSchema.ClientCapabilities.builder().build(); + // ========== Phase 1: Create wrapper (client is temporarily null) ========== + McpSyncClientWrapper wrapper = new McpSyncClientWrapper(name, null); + + // ========== Phase 2: Build client (can reference wrapper) ========== McpSyncClient mcpClient = McpClient.sync(transport) .requestTimeout(requestTimeout) .initializationTimeout(initializationTimeout) .clientInfo(clientInfo) .capabilities(clientCapabilities) + // ----- Log notification Consumer ----- + .loggingConsumer( + notification -> { + // Parse notification content + String level = + notification.level() != null + ? notification.level().toString() + : "info"; + String loggerName = + notification.logger() != null + ? notification.logger() + : "mcp"; + String data = + notification.data() != null ? notification.data() : ""; + + // Log to SLF4J by level + switch (level.toLowerCase()) { + case "error" -> + logger.error( + "[MCP-{}] [{}] {}", name, loggerName, data); + case "warning" -> + logger.warn( + "[MCP-{}] [{}] {}", name, loggerName, data); + case "debug" -> + logger.debug( + "[MCP-{}] [{}] {}", name, loggerName, data); + default -> + logger.info( + "[MCP-{}] [{}] {}", name, loggerName, data); + } + }) + + // ----- Tools change notification Consumer ----- + .toolsChangeConsumer( + tools -> { + // Call wrapper method to update cache + wrapper.updateCachedTools(tools); + }) .build(); - return new McpSyncClientWrapper(name, mcpClient); + // ========== Phase 3: Link MCP client to wrapper ========== + wrapper.setClient(mcpClient); + return wrapper; } // ==================== Internal Transport Configuration Classes ==================== diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index b9d75cf8b..fc9161877 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -55,6 +56,34 @@ public McpSyncClientWrapper(String name, McpSyncClient client) { this.clientRef = new AtomicReference<>(client); } + /** + * Sets the underlying MCP sync client. This is called by McpClientBuilder after the client is + * created with notification handlers. + * + * @param client the MCP sync client + */ + void setClient(McpSyncClient client) { + this.clientRef.set(client); + } + + /** + * Updates the cached tools map with new tools from the server. This method is called when the + * server sends a tools/list_changed notification. This method is thread-safe and can be + * called concurrently from notification handlers. + * + * @param tools the new list of tools from the server (empty list clears cache) + */ + void updateCachedTools(List tools) { + if (tools != null) { + // Build new map first, then atomically replace + Map newTools = + tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + cachedTools.clear(); + cachedTools.putAll(newTools); + logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); + } + } + /** * Initializes the sync MCP client connection and caches available tools. * @@ -75,7 +104,7 @@ public Mono initialize() { McpSyncClient client = clientRef.get(); if (client == null) { throw new IllegalStateException( - "McpSyncClient not set. Call setClient() first."); + "MCP client '" + name + "' not available"); } logger.info("Initializing MCP sync client: {}", name); diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java index b6cbcbcd5..973c4d526 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -323,6 +324,259 @@ void testCallTool_WithNullArguments() { assertFalse(Boolean.TRUE.equals(result.isError())); } + // ==================== updateCachedTools Tests ==================== + + @Test + void testUpdateCachedTools_AddsNewTools() { + // Initially cache is empty + assertTrue(wrapper.cachedTools.isEmpty()); + + // Create mock tools + McpSchema.Tool tool1 = + new McpSchema.Tool( + "new-tool-1", + null, + "New tool 1", + new McpSchema.JsonSchema("object", null, null, null, null, null), + null, + null, + null); + McpSchema.Tool tool2 = + new McpSchema.Tool( + "new-tool-2", + null, + "New tool 2", + new McpSchema.JsonSchema("object", null, null, null, null, null), + null, + null, + null); + + // Call updateCachedTools (package-private, use reflection) + invokeUpdateCachedTools(List.of(tool1, tool2)); + + // Verify cache was updated + assertEquals(2, wrapper.cachedTools.size()); + assertNotNull(wrapper.getCachedTool("new-tool-1")); + assertNotNull(wrapper.getCachedTool("new-tool-2")); + assertEquals("New tool 1", wrapper.getCachedTool("new-tool-1").description()); + assertEquals("New tool 2", wrapper.getCachedTool("new-tool-2").description()); + } + + @Test + void testUpdateCachedTools_EmptyListClearsCache() { + // First add some tools to cache + setupSuccessfulInitialization(); + wrapper.initialize().block(); + assertEquals(1, wrapper.cachedTools.size()); + + // Then simulate server sending empty tool list (all tools removed) + invokeUpdateCachedTools(List.of()); + + // Verify cache was cleared + assertTrue(wrapper.cachedTools.isEmpty()); + assertNull(wrapper.getCachedTool("tool1")); + } + + @Test + void testUpdateCachedTools_NullDoesNothing() { + // Add some tools to cache first + setupSuccessfulInitialization(); + wrapper.initialize().block(); + int initialSize = wrapper.cachedTools.size(); + + // Call with null should not modify cache + invokeUpdateCachedTools(null); + + // Verify cache unchanged + assertEquals(initialSize, wrapper.cachedTools.size()); + assertNotNull(wrapper.getCachedTool("tool1")); + } + + @Test + void testUpdateCachedTools_ReplacesExistingTools() { + // Initialize with existing tools + setupSuccessfulInitialization(); + wrapper.initialize().block(); + assertEquals(1, wrapper.cachedTools.size()); + + // Simulate server sending updated tool list + McpSchema.Tool updatedTool = + new McpSchema.Tool( + "tool1", + null, + "Updated description", + new McpSchema.JsonSchema("string", null, null, null, null, null), + null, + null, + null); + McpSchema.Tool newTool = + new McpSchema.Tool( + "new-tool", + null, + "Brand new tool", + new McpSchema.JsonSchema("object", null, null, null, null, null), + null, + null, + null); + + invokeUpdateCachedTools(List.of(updatedTool, newTool)); + + // Verify cache was updated (not appended) + assertEquals(2, wrapper.cachedTools.size()); + assertNotNull(wrapper.getCachedTool("tool1")); + assertEquals("Updated description", wrapper.getCachedTool("tool1").description()); + assertNotNull(wrapper.getCachedTool("new-tool")); + assertEquals("Brand new tool", wrapper.getCachedTool("new-tool").description()); + } + + @Test + void testUpdateCachedTools_MultipleTimes() { + // First update + McpSchema.Tool tool1 = + new McpSchema.Tool( + "tool-a", + null, + "Tool A", + new McpSchema.JsonSchema("object", null, null, null, null, null), + null, + null, + null); + invokeUpdateCachedTools(List.of(tool1)); + assertEquals(1, wrapper.cachedTools.size()); + + // Second update - replace + McpSchema.Tool tool2 = + new McpSchema.Tool( + "tool-b", + null, + "Tool B", + new McpSchema.JsonSchema("object", null, null, null, null, null), + null, + null, + null); + invokeUpdateCachedTools(List.of(tool2)); + + // Verify only latest tools exist + assertEquals(1, wrapper.cachedTools.size()); + assertNull(wrapper.getCachedTool("tool-a")); + assertNotNull(wrapper.getCachedTool("tool-b")); + } + + // ==================== setClient Tests ==================== + + @Test + void testSetClient_ReplacesClient() { + // Wrapper created with original client + assertNotNull(wrapper); + assertEquals("test-sync-client", wrapper.getName()); + + // Create a new client + McpSyncClient newClient = mock(McpSyncClient.class); + assertNotSame(mockClient, newClient); + + // Replace client using reflection + invokeSetClient(newClient); + + // Verify by testing initialization with new client + McpSchema.Implementation serverInfo = + new McpSchema.Implementation("NewServer", "New Server", "1.0"); + McpSchema.InitializeResult initResult = + new McpSchema.InitializeResult( + "1.0", + McpSchema.ServerCapabilities.builder().build(), + serverInfo, + null, + null); + + when(newClient.initialize()).thenReturn(initResult); + when(newClient.listTools()).thenReturn(new McpSchema.ListToolsResult(List.of(), null)); + + wrapper.initialize().block(); + + // Verify new client was used + verify(newClient, times(1)).initialize(); + verify(mockClient, times(0)).initialize(); // Original client never called + } + + // ==================== close() Idempotency Tests ==================== + + @Test + void testClose_Idempotent() { + setupSuccessfulInitialization(); + wrapper.initialize().block(); + assertTrue(wrapper.isInitialized()); + + // Close multiple times + wrapper.close(); + wrapper.close(); + wrapper.close(); + + // Verify closeGracefully was called only once + verify(mockClient, times(1)).closeGracefully(); + assertFalse(wrapper.isInitialized()); + } + + @Test + void testClose_WithoutInitialize() { + // Close without ever initializing + wrapper.close(); + + // Should not throw, client should be nullified + assertFalse(wrapper.isInitialized()); + assertTrue(wrapper.cachedTools.isEmpty()); + } + + @Test + void testClose_AfterSetClientToNull() { + // First initialize normally + setupSuccessfulInitialization(); + wrapper.initialize().block(); + assertTrue(wrapper.isInitialized()); + + // Manually set client to null via reflection + invokeSetClient(null); + + // Close should handle gracefully + wrapper.close(); + + assertFalse(wrapper.isInitialized()); + assertTrue(wrapper.cachedTools.isEmpty()); + } + + // ==================== Helper Methods ==================== + + /** + * Invokes the package-private updateCachedTools method using reflection. + * + * @param tools the list of tools to update + */ + private void invokeUpdateCachedTools(List tools) { + try { + java.lang.reflect.Method method = + McpSyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class); + method.setAccessible(true); + method.invoke(wrapper, tools); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke updateCachedTools", e); + } + } + + /** + * Invokes the package-private setClient method using reflection. + * + * @param client the MCP sync client to set + */ + private void invokeSetClient(McpSyncClient client) { + try { + java.lang.reflect.Method method = + McpSyncClientWrapper.class.getDeclaredMethod("setClient", McpSyncClient.class); + method.setAccessible(true); + method.invoke(wrapper, client); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke setClient", e); + } + } + private void setupSuccessfulInitialization() { McpSchema.Implementation serverInfo = new McpSchema.Implementation("TestServer", "Test Server", "1.0.8-SNAPSHOT"); From b966d99e65f9445ebefc5e2b4d5f3ec9d82b1da4 Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Thu, 15 Jan 2026 10:25:30 +0800 Subject: [PATCH 3/9] feat(mcp): add notification handlers support for sync client --- .../tool/mcp/McpAsyncClientWrapperTest.java | 47 +++++++++++++++++++ .../tool/mcp/McpSyncClientWrapperTest.java | 46 ++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java index 6ad759a62..950e43c39 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -428,6 +429,52 @@ void testClose_AfterSetClientToNull() { assertTrue(wrapper.cachedTools.isEmpty()); } + // ==================== Null Client Error Path Tests ==================== + + @Test + void testInitialize_WhenClientIsNull() { + // Create wrapper with null client + McpAsyncClientWrapper nullWrapper = new McpAsyncClientWrapper("null-client", null); + + // Attempt to initialize should fail with "not available" error + Exception exception = + assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block()); + + assertTrue(exception.getMessage().contains("not available")); + } + + @Test + void testListTools_WhenClientIsNullAfterSet() { + setupSuccessfulInitialization(); + wrapper.initialize().block(); + + // Set client to null via reflection + invokeSetClient(null); + + // Attempt to list tools should fail with "not available" error + Exception exception = + assertThrows(IllegalStateException.class, () -> wrapper.listTools().block()); + + assertTrue(exception.getMessage().contains("not available")); + } + + @Test + void testCallTool_WhenClientIsNullAfterSet() { + setupSuccessfulInitialization(); + wrapper.initialize().block(); + + // Set client to null via reflection + invokeSetClient(null); + + // Attempt to call tool should fail with "not available" error + Exception exception = + assertThrows( + IllegalStateException.class, + () -> wrapper.callTool("test-tool", Map.of()).block()); + + assertTrue(exception.getMessage().contains("not available")); + } + // ==================== Helper Methods ==================== /** diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java index 973c4d526..ed3dd53ef 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java @@ -543,6 +543,52 @@ void testClose_AfterSetClientToNull() { assertTrue(wrapper.cachedTools.isEmpty()); } + // ==================== Null Client Error Path Tests ==================== + + @Test + void testInitialize_WhenClientIsNull() { + // Create wrapper with null client + McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null); + + // Attempt to initialize should fail with "not available" error + Exception exception = + assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block()); + + assertTrue(exception.getMessage().contains("not available")); + } + + @Test + void testListTools_WhenClientIsNullAfterSet() { + setupSuccessfulInitialization(); + wrapper.initialize().block(); + + // Set client to null via reflection + invokeSetClient(null); + + // Attempt to list tools should fail with "not available" error + Exception exception = + assertThrows(IllegalStateException.class, () -> wrapper.listTools().block()); + + assertTrue(exception.getMessage().contains("not available")); + } + + @Test + void testCallTool_WhenClientIsNullAfterSet() { + setupSuccessfulInitialization(); + wrapper.initialize().block(); + + // Set client to null via reflection + invokeSetClient(null); + + // Attempt to call tool should fail with "not available" error + Exception exception = + assertThrows( + IllegalStateException.class, + () -> wrapper.callTool("test-tool", Map.of()).block()); + + assertTrue(exception.getMessage().contains("not available")); + } + // ==================== Helper Methods ==================== /** From 9a23888656ca398c05ad15288f765ccad5e4b0d6 Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Thu, 15 Jan 2026 18:44:26 +0800 Subject: [PATCH 4/9] add synchronized --- .../io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java | 5 +++-- .../io/agentscope/core/tool/mcp/McpSyncClientWrapper.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index 5f15fd155..a7fd455f9 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -66,11 +66,12 @@ void setClient(McpAsyncClient client) { /** * Updates the cached tools map with new tools from the server. This method is called when the - * server sends a tools/list_changed notification. + * server sends a tools/list_changed notification. This method is thread-safe and can be + * called concurrently from notification handlers. * * @param tools the new list of tools from the server (empty list clears cache) */ - void updateCachedTools(List tools) { + synchronized void updateCachedTools(List tools) { if (tools != null) { // Build new map first, then atomically replace Map newTools = diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index fc9161877..40e1c3f7b 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -73,7 +73,7 @@ void setClient(McpSyncClient client) { * * @param tools the new list of tools from the server (empty list clears cache) */ - void updateCachedTools(List tools) { + synchronized void updateCachedTools(List tools) { if (tools != null) { // Build new map first, then atomically replace Map newTools = From 3e95f85009100df4b80d6f2766b243bbf6c0b75c Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Fri, 16 Jan 2026 13:44:27 +0800 Subject: [PATCH 5/9] add volatile --- .../core/tool/mcp/McpAsyncClientWrapper.java | 23 ++++++++++++------- .../core/tool/mcp/McpClientWrapper.java | 2 +- .../core/tool/mcp/McpSyncClientWrapper.java | 22 ++++++++++++------ .../higress/HigressMcpClientWrapper.java | 22 ++++++++++++++---- 4 files changed, 48 insertions(+), 21 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index a7fd455f9..f6565ed9a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -19,6 +19,7 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -71,13 +72,13 @@ void setClient(McpAsyncClient client) { * * @param tools the new list of tools from the server (empty list clears cache) */ - synchronized void updateCachedTools(List tools) { + void updateCachedTools(List tools) { if (tools != null) { - // Build new map first, then atomically replace + // Build new map first, then atomically replace via volatile assignment Map newTools = - tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); - cachedTools.clear(); - cachedTools.putAll(newTools); + tools.stream() + .collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } } @@ -117,8 +118,14 @@ public Mono initialize() { "MCP client '{}' discovered {} tools", name, result.tools().size()); - // Cache all tools - result.tools().forEach(tool -> cachedTools.put(tool.name(), tool)); + // Cache all tools - build new map then atomically replace + Map newTools = + result.tools() + .stream() + .collect( + Collectors.toMap( + McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); }) .doOnSuccess(v -> initialized = true) .doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e)) @@ -216,6 +223,6 @@ public void close() { } } initialized = false; - cachedTools.clear(); + cachedTools = new ConcurrentHashMap<>(); } } diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java index 473af292f..f65286adc 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java @@ -43,7 +43,7 @@ public abstract class McpClientWrapper implements AutoCloseable { protected final String name; /** Cache of tools available from this MCP server */ - protected final Map cachedTools; + protected volatile Map cachedTools; /** Flag indicating whether the client has been initialized */ protected volatile boolean initialized = false; diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index 40e1c3f7b..a0a9d7323 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -19,6 +19,7 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -73,13 +74,13 @@ void setClient(McpSyncClient client) { * * @param tools the new list of tools from the server (empty list clears cache) */ - synchronized void updateCachedTools(List tools) { + void updateCachedTools(List tools) { if (tools != null) { - // Build new map first, then atomically replace + // Build new map first, then atomically replace via volatile assignment Map newTools = - tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); - cachedTools.clear(); - cachedTools.putAll(newTools); + tools.stream() + .collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } } @@ -123,7 +124,14 @@ public Mono initialize() { name, toolsResult.tools().size()); - toolsResult.tools().forEach(tool -> cachedTools.put(tool.name(), tool)); + // Cache all tools - build new map then atomically replace + Map newTools = + toolsResult.tools() + .stream() + .collect( + Collectors.toMap( + McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); initialized = true; return null; @@ -227,6 +235,6 @@ public void close() { } } initialized = false; - cachedTools.clear(); + cachedTools = new ConcurrentHashMap<>(); } } diff --git a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java index ad289c628..3c7a7d778 100644 --- a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java +++ b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java @@ -20,6 +20,8 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -185,8 +187,13 @@ public Mono> listTools() { }) .doOnNext( tools -> { - // Cache tools locally - tools.forEach(tool -> cachedTools.put(tool.name(), tool)); + // Cache tools locally - build new map then atomically replace + Map newTools = + tools.stream() + .collect( + Collectors.toMap( + McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); }); } else { // Return all tools from delegate @@ -194,8 +201,13 @@ public Mono> listTools() { .listTools() .doOnNext( tools -> { - // Cache tools locally - tools.forEach(tool -> cachedTools.put(tool.name(), tool)); + // Cache tools locally - build new map then atomically replace + Map newTools = + tools.stream() + .collect( + Collectors.toMap( + McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); logger.debug( "Higress MCP client '{}' discovered {} tools", name, @@ -305,7 +317,7 @@ public void close() { } this.initialized = false; - this.cachedTools.clear(); + this.cachedTools = new ConcurrentHashMap<>(); } /** From 7c97c5503354d38ba0e6ac8d09f17ce105f74757 Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Fri, 16 Jan 2026 13:55:31 +0800 Subject: [PATCH 6/9] fix Spotless --- .../agentscope/core/tool/mcp/McpAsyncClientWrapper.java | 9 +++------ .../agentscope/core/tool/mcp/McpSyncClientWrapper.java | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index f6565ed9a..3bb566072 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -76,8 +76,7 @@ void updateCachedTools(List tools) { if (tools != null) { // Build new map first, then atomically replace via volatile assignment Map newTools = - tools.stream() - .collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } @@ -120,11 +119,9 @@ public Mono initialize() { result.tools().size()); // Cache all tools - build new map then atomically replace Map newTools = - result.tools() - .stream() + result.tools().stream() .collect( - Collectors.toMap( - McpSchema.Tool::name, t -> t)); + Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); }) .doOnSuccess(v -> initialized = true) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index a0a9d7323..2bd89d7c7 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -78,8 +78,7 @@ void updateCachedTools(List tools) { if (tools != null) { // Build new map first, then atomically replace via volatile assignment Map newTools = - tools.stream() - .collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } @@ -126,11 +125,9 @@ public Mono initialize() { // Cache all tools - build new map then atomically replace Map newTools = - toolsResult.tools() - .stream() + toolsResult.tools().stream() .collect( - Collectors.toMap( - McpSchema.Tool::name, t -> t)); + Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); initialized = true; From 5733e57fd73c7d65cef4cb1b94b24baba79dde23 Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Mon, 26 Jan 2026 14:21:26 +0800 Subject: [PATCH 7/9] fix --- .../core/tool/mcp/McpAsyncClientWrapper.java | 75 +++--- .../core/tool/mcp/McpClientBuilder.java | 249 ++++++++---------- .../core/tool/mcp/McpSyncClientWrapper.java | 73 +++-- .../tool/mcp/McpAsyncClientWrapperTest.java | 117 +------- .../tool/mcp/McpSyncClientWrapperTest.java | 123 +-------- 5 files changed, 184 insertions(+), 453 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index 3bb566072..5c5a94087 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -19,8 +19,9 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,8 @@ public class McpAsyncClientWrapper extends McpClientWrapper { private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class); - private final AtomicReference clientRef; + private final McpAsyncClient client; + private final AtomicBoolean closed = new AtomicBoolean(false); /** * Constructs a new asynchronous MCP client wrapper. @@ -52,17 +54,18 @@ public class McpAsyncClientWrapper extends McpClientWrapper { */ public McpAsyncClientWrapper(String name, McpAsyncClient client) { super(name); - this.clientRef = new AtomicReference<>(client); + this.client = Objects.requireNonNull(client, "MCP client cannot be null"); } /** - * Sets the underlying MCP async client. This is called by McpClientBuilder after the client - * is created with notification handlers. + * Package-private constructor for use by McpClientBuilder. * - * @param client the MCP async client + * @param name unique identifier for this client + * @param builder the builder to construct the client with notification handlers */ - void setClient(McpAsyncClient client) { - this.clientRef.set(client); + McpAsyncClientWrapper(String name, McpClientBuilder builder) { + super(name); + this.client = builder.buildClientForAsync(this); } /** @@ -73,13 +76,14 @@ void setClient(McpAsyncClient client) { * @param tools the new list of tools from the server (empty list clears cache) */ void updateCachedTools(List tools) { - if (tools != null) { - // Build new map first, then atomically replace via volatile assignment - Map newTools = - tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); - cachedTools = new ConcurrentHashMap<>(newTools); - logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); + if (closed.get() || tools == null) { + return; } + // Build new map first, then atomically replace via volatile assignment + Map newTools = + tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); + logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } /** @@ -92,15 +96,13 @@ void updateCachedTools(List tools) { */ @Override public Mono initialize() { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (initialized) { return Mono.empty(); } - McpAsyncClient client = clientRef.get(); - if (client == null) { - return Mono.error(new IllegalStateException("MCP client '" + name + "' not available")); - } - logger.info("Initializing MCP async client: {}", name); return client.initialize() @@ -124,7 +126,12 @@ public Mono initialize() { Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); }) - .doOnSuccess(v -> initialized = true) + .doOnSuccess( + v -> { + if (!closed.get()) { + initialized = true; + } + }) .doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e)) .then(); } @@ -139,16 +146,14 @@ public Mono initialize() { */ @Override public Mono> listTools() { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (!initialized) { return Mono.error( new IllegalStateException("MCP client '" + name + "' not initialized")); } - McpAsyncClient client = clientRef.get(); - if (client == null) { - return Mono.error(new IllegalStateException("MCP client '" + name + "' not available")); - } - return client.listTools().map(McpSchema.ListToolsResult::tools); } @@ -164,16 +169,14 @@ public Mono> listTools() { */ @Override public Mono callTool(String toolName, Map arguments) { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (!initialized) { return Mono.error( new IllegalStateException("MCP client '" + name + "' not initialized")); } - McpAsyncClient client = clientRef.get(); - if (client == null) { - return Mono.error(new IllegalStateException("MCP client '" + name + "' not available")); - } - logger.debug("Calling MCP tool '{}' on client '{}'", toolName, name); McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(toolName, arguments); @@ -206,20 +209,20 @@ public Mono callTool(String toolName, Map logger.debug("MCP client '{}' closed", name)) .doOnError(e -> logger.error("Error closing MCP client '{}'", name, e)) .block(); } catch (Exception e) { logger.error("Exception during MCP client close", e); - toClose.close(); + client.close(); + } finally { + initialized = false; + cachedTools = new ConcurrentHashMap<>(); } } - initialized = false; - cachedTools = new ConcurrentHashMap<>(); } } diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java index 394abd40c..2b50b7b1d 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java @@ -286,9 +286,8 @@ public McpClientBuilder initializationTimeout(Duration timeout) { /** * Builds an asynchronous MCP client wrapper. * - *

This method uses a two-phase build pattern to support notification handlers. - * The wrapper is created first, then the MCP client is built with notification consumers - * that can reference the wrapper. + *

This method uses a constructor that passes the builder to the wrapper, allowing the + * wrapper to construct its own client with notification handlers. * * @return Mono emitting the async client wrapper */ @@ -297,96 +296,14 @@ public Mono buildAsync() { return Mono.error(new IllegalStateException("Transport must be configured")); } - return Mono.fromCallable( - () -> { - McpClientTransport transport = transportConfig.createTransport(); - - McpSchema.Implementation clientInfo = - new McpSchema.Implementation( - "agentscope-java", "AgentScope Java Framework", "1.0.9-SNAPSHOT"); - - McpSchema.ClientCapabilities clientCapabilities = - McpSchema.ClientCapabilities.builder().build(); - - // ========== Phase 1: Create wrapper (client is temporarily null) ========== - McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null); - - // ========== Phase 2: Build client (can reference wrapper) ========== - McpAsyncClient mcpClient = - McpClient.async(transport) - .requestTimeout(requestTimeout) - .initializationTimeout(initializationTimeout) - .clientInfo(clientInfo) - .capabilities(clientCapabilities) - - // ----- Log notification Consumer ----- - .loggingConsumer( - notification -> { - // Parse notification content - String level = - notification.level() != null - ? notification.level().toString() - : "info"; - String loggerName = - notification.logger() != null - ? notification.logger() - : "mcp"; - String data = - notification.data() != null - ? notification.data() - : ""; - - // Log to SLF4J by level - switch (level.toLowerCase()) { - case "error" -> - logger.error( - "[MCP-{}] [{}] {}", - name, - loggerName, - data); - case "warning" -> - logger.warn( - "[MCP-{}] [{}] {}", - name, - loggerName, - data); - case "debug" -> - logger.debug( - "[MCP-{}] [{}] {}", - name, - loggerName, - data); - default -> - logger.info( - "[MCP-{}] [{}] {}", - name, - loggerName, - data); - } - return Mono.empty(); - }) - - // ----- Tools change notification Consumer ----- - .toolsChangeConsumer( - tools -> { - // Call wrapper method to update cache - wrapper.updateCachedTools(tools); - return Mono.empty(); - }) - .build(); - - // ========== Phase 3: Link MCP client to wrapper ========== - wrapper.setClient(mcpClient); - return wrapper; - }); + return Mono.fromCallable(() -> new McpAsyncClientWrapper(name, this)); } /** * Builds a synchronous MCP client wrapper (blocking operations). * - *

This method uses a two-phase build pattern to support notification handlers. The wrapper - * is created first, then the MCP client is built with notification consumers that can - * reference the wrapper. + *

This method uses a constructor that passes the builder to the wrapper, allowing the + * wrapper to construct its own client with notification handlers. * * @return synchronous client wrapper */ @@ -395,68 +312,116 @@ public McpClientWrapper buildSync() { throw new IllegalStateException("Transport must be configured"); } + return new McpSyncClientWrapper(name, this); + } + + // ==================== Internal Client Building Methods ==================== + + /** + * Builds an MCP async client with notification handlers for the specified wrapper. + * + * @param wrapper the wrapper that will receive notification callbacks + * @return the built MCP async client + */ + McpAsyncClient buildClientForAsync(McpAsyncClientWrapper wrapper) { + if (transportConfig == null) { + throw new IllegalStateException("Transport must be configured"); + } + McpClientTransport transport = transportConfig.createTransport(); McpSchema.Implementation clientInfo = new McpSchema.Implementation( - "agentscope-java", "AgentScope Java Framework", "1.0.9-SNAPSHOT"); + "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT"); McpSchema.ClientCapabilities clientCapabilities = McpSchema.ClientCapabilities.builder().build(); - // ========== Phase 1: Create wrapper (client is temporarily null) ========== - McpSyncClientWrapper wrapper = new McpSyncClientWrapper(name, null); - - // ========== Phase 2: Build client (can reference wrapper) ========== - McpSyncClient mcpClient = - McpClient.sync(transport) - .requestTimeout(requestTimeout) - .initializationTimeout(initializationTimeout) - .clientInfo(clientInfo) - .capabilities(clientCapabilities) - // ----- Log notification Consumer ----- - .loggingConsumer( - notification -> { - // Parse notification content - String level = - notification.level() != null - ? notification.level().toString() - : "info"; - String loggerName = - notification.logger() != null - ? notification.logger() - : "mcp"; - String data = - notification.data() != null ? notification.data() : ""; - - // Log to SLF4J by level - switch (level.toLowerCase()) { - case "error" -> - logger.error( - "[MCP-{}] [{}] {}", name, loggerName, data); - case "warning" -> - logger.warn( - "[MCP-{}] [{}] {}", name, loggerName, data); - case "debug" -> - logger.debug( - "[MCP-{}] [{}] {}", name, loggerName, data); - default -> - logger.info( - "[MCP-{}] [{}] {}", name, loggerName, data); - } - }) - - // ----- Tools change notification Consumer ----- - .toolsChangeConsumer( - tools -> { - // Call wrapper method to update cache - wrapper.updateCachedTools(tools); - }) - .build(); - - // ========== Phase 3: Link MCP client to wrapper ========== - wrapper.setClient(mcpClient); - return wrapper; + return McpClient.async(transport) + .requestTimeout(requestTimeout) + .initializationTimeout(initializationTimeout) + .clientInfo(clientInfo) + .capabilities(clientCapabilities) + .loggingConsumer(buildAsyncLoggingConsumer()) + .toolsChangeConsumer( + tools -> { + wrapper.updateCachedTools(tools); + return Mono.empty(); + }) + .build(); + } + + /** + * Builds an MCP sync client with notification handlers for the specified wrapper. + * + * @param wrapper the wrapper that will receive notification callbacks + * @return the built MCP sync client + */ + McpSyncClient buildClientForSync(McpSyncClientWrapper wrapper) { + if (transportConfig == null) { + throw new IllegalStateException("Transport must be configured"); + } + + McpClientTransport transport = transportConfig.createTransport(); + + McpSchema.Implementation clientInfo = + new McpSchema.Implementation( + "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT"); + + McpSchema.ClientCapabilities clientCapabilities = + McpSchema.ClientCapabilities.builder().build(); + + return McpClient.sync(transport) + .requestTimeout(requestTimeout) + .initializationTimeout(initializationTimeout) + .clientInfo(clientInfo) + .capabilities(clientCapabilities) + .loggingConsumer(buildSyncLoggingConsumer()) + .toolsChangeConsumer(tools -> wrapper.updateCachedTools(tools)) + .build(); + } + + /** + * Creates a logging function for async clients that maps MCP log notifications to SLF4J. + * + * @return the logging function + */ + private java.util.function.Function< + io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification, Mono> + buildAsyncLoggingConsumer() { + return notification -> { + logNotification(notification); + return Mono.empty(); + }; + } + + /** + * Creates a logging consumer for sync clients that maps MCP log notifications to SLF4J. + * + * @return the logging consumer + */ + private Consumer + buildSyncLoggingConsumer() { + return notification -> logNotification(notification); + } + + /** + * Logs an MCP notification to SLF4J at the appropriate level. + * + * @param notification the notification to log + */ + private void logNotification( + io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification notification) { + String level = notification.level() != null ? notification.level().toString() : "info"; + String loggerName = notification.logger() != null ? notification.logger() : "mcp"; + String data = notification.data() != null ? notification.data() : ""; + + switch (level.toLowerCase()) { + case "error" -> logger.error("[MCP-{}] [{}] {}", name, loggerName, data); + case "warning" -> logger.warn("[MCP-{}] [{}] {}", name, loggerName, data); + case "debug" -> logger.debug("[MCP-{}] [{}] {}", name, loggerName, data); + default -> logger.info("[MCP-{}] [{}] {}", name, loggerName, data); + } } // ==================== Internal Transport Configuration Classes ==================== diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java index 2bd89d7c7..952da75ca 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java @@ -19,8 +19,9 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,8 @@ public class McpSyncClientWrapper extends McpClientWrapper { private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class); - private final AtomicReference clientRef; + private final McpSyncClient client; + private final AtomicBoolean closed = new AtomicBoolean(false); /** * Constructs a new synchronous MCP client wrapper. @@ -54,17 +56,18 @@ public class McpSyncClientWrapper extends McpClientWrapper { */ public McpSyncClientWrapper(String name, McpSyncClient client) { super(name); - this.clientRef = new AtomicReference<>(client); + this.client = Objects.requireNonNull(client, "MCP client cannot be null"); } /** - * Sets the underlying MCP sync client. This is called by McpClientBuilder after the client is - * created with notification handlers. + * Package-private constructor for use by McpClientBuilder. * - * @param client the MCP sync client + * @param name unique identifier for this client + * @param builder the builder to construct the client with notification handlers */ - void setClient(McpSyncClient client) { - this.clientRef.set(client); + McpSyncClientWrapper(String name, McpClientBuilder builder) { + super(name); + this.client = builder.buildClientForSync(this); } /** @@ -75,13 +78,14 @@ void setClient(McpSyncClient client) { * @param tools the new list of tools from the server (empty list clears cache) */ void updateCachedTools(List tools) { - if (tools != null) { - // Build new map first, then atomically replace via volatile assignment - Map newTools = - tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); - cachedTools = new ConcurrentHashMap<>(newTools); - logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); + if (closed.get() || tools == null) { + return; } + // Build new map first, then atomically replace via volatile assignment + Map newTools = + tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t)); + cachedTools = new ConcurrentHashMap<>(newTools); + logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); } /** @@ -95,18 +99,15 @@ void updateCachedTools(List tools) { */ @Override public Mono initialize() { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (initialized) { return Mono.empty(); } return Mono.fromCallable( () -> { - McpSyncClient client = clientRef.get(); - if (client == null) { - throw new IllegalStateException( - "MCP client '" + name + "' not available"); - } - logger.info("Initializing MCP sync client: {}", name); // Initialize the client (blocking) @@ -130,7 +131,9 @@ public Mono initialize() { Collectors.toMap(McpSchema.Tool::name, t -> t)); cachedTools = new ConcurrentHashMap<>(newTools); - initialized = true; + if (!closed.get()) { + initialized = true; + } return null; }) .subscribeOn(Schedulers.boundedElastic()) @@ -148,16 +151,14 @@ public Mono initialize() { */ @Override public Mono> listTools() { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (!initialized) { return Mono.error( new IllegalStateException("MCP client '" + name + "' not initialized")); } - McpSyncClient client = clientRef.get(); - if (client == null) { - return Mono.error(new IllegalStateException("MCP client '" + name + "' not available")); - } - return Mono.fromCallable(() -> client.listTools().tools()) .subscribeOn(Schedulers.boundedElastic()); } @@ -174,16 +175,14 @@ public Mono> listTools() { */ @Override public Mono callTool(String toolName, Map arguments) { + if (closed.get()) { + return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed")); + } if (!initialized) { return Mono.error( new IllegalStateException("MCP client '" + name + "' not initialized")); } - McpSyncClient client = clientRef.get(); - if (client == null) { - return Mono.error(new IllegalStateException("MCP client '" + name + "' not available")); - } - logger.debug("Calling MCP tool '{}' on client '{}'", toolName, name); return Mono.fromCallable( @@ -220,18 +219,18 @@ public Mono callTool(String toolName, Map(); } } - initialized = false; - cachedTools = new ConcurrentHashMap<>(); } } diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java index a52536f42..0a86b3c1d 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java @@ -18,9 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -343,43 +341,6 @@ void testUpdateCachedTools_MultipleTimes() { assertNotNull(wrapper.getCachedTool("tool-b")); } - // ==================== setClient Tests ==================== - - @Test - void testSetClient_ReplacesClient() { - // Wrapper created with original client - assertNotNull(wrapper); - assertEquals("test-async-client", wrapper.getName()); - - // Create a new client - McpAsyncClient newClient = mock(McpAsyncClient.class); - assertNotSame(mockClient, newClient); - - // Replace client using reflection - invokeSetClient(newClient); - - // Verify by testing initialization with new client - McpSchema.Implementation serverInfo = - new McpSchema.Implementation("NewServer", "New Server", "1.0"); - McpSchema.InitializeResult initResult = - new McpSchema.InitializeResult( - "1.0", - McpSchema.ServerCapabilities.builder().build(), - serverInfo, - null, - null); - - when(newClient.initialize()).thenReturn(Mono.just(initResult)); - when(newClient.listTools()) - .thenReturn(Mono.just(new McpSchema.ListToolsResult(List.of(), null))); - - wrapper.initialize().block(); - - // Verify new client was used - verify(newClient, times(1)).initialize(); - verify(mockClient, times(0)).initialize(); // Original client never called - } - // ==================== close() Idempotency Tests ==================== @Test @@ -412,68 +373,9 @@ void testClose_WithoutInitialize() { assertTrue(wrapper.cachedTools.isEmpty()); } - @Test - void testClose_AfterSetClientToNull() { - // First initialize normally - setupSuccessfulInitialization(); - wrapper.initialize().block(); - assertTrue(wrapper.isInitialized()); - - // Manually set client to null via reflection - invokeSetClient(null); - - // Close should handle gracefully - wrapper.close(); - - assertFalse(wrapper.isInitialized()); - assertTrue(wrapper.cachedTools.isEmpty()); - } - // ==================== Null Client Error Path Tests ==================== - @Test - void testInitialize_WhenClientIsNull() { - // Create wrapper with null client - McpAsyncClientWrapper nullWrapper = new McpAsyncClientWrapper("null-client", null); - - // Attempt to initialize should fail with "not available" error - Exception exception = - assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block()); - - assertTrue(exception.getMessage().contains("not available")); - } - - @Test - void testListTools_WhenClientIsNullAfterSet() { - setupSuccessfulInitialization(); - wrapper.initialize().block(); - - // Set client to null via reflection - invokeSetClient(null); - - // Attempt to list tools should fail with "not available" error - Exception exception = - assertThrows(IllegalStateException.class, () -> wrapper.listTools().block()); - - assertTrue(exception.getMessage().contains("not available")); - } - - @Test - void testCallTool_WhenClientIsNullAfterSet() { - setupSuccessfulInitialization(); - wrapper.initialize().block(); - - // Set client to null via reflection - invokeSetClient(null); - - // Attempt to call tool should fail with "not available" error - Exception exception = - assertThrows( - IllegalStateException.class, - () -> wrapper.callTool("test-tool", Map.of()).block()); - - assertTrue(exception.getMessage().contains("not available")); - } + // Removed: client is now final and cannot be null after construction // ==================== Helper Methods ==================== @@ -493,23 +395,6 @@ private void invokeUpdateCachedTools(List tools) { } } - /** - * Invokes the package-private setClient method using reflection. - * - * @param client the MCP async client to set - */ - private void invokeSetClient(McpAsyncClient client) { - try { - java.lang.reflect.Method method = - McpAsyncClientWrapper.class.getDeclaredMethod( - "setClient", McpAsyncClient.class); - method.setAccessible(true); - method.invoke(wrapper, client); - } catch (Exception e) { - throw new RuntimeException("Failed to invoke setClient", e); - } - } - private void setupSuccessfulInitialization() { McpSchema.Implementation serverInfo = new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT"); diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java index a873129a9..117c3cdb9 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -283,15 +282,6 @@ void testClose_GracefulCloseFails() { verify(mockClient, times(1)).close(); } - @Test - void testClose_NullClient() { - McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null); - - nullWrapper.close(); - - assertFalse(nullWrapper.isInitialized()); - } - @Test void testClose_MultipleCallsSafe() { setupSuccessfulInitialization(); @@ -462,42 +452,6 @@ void testUpdateCachedTools_MultipleTimes() { assertNotNull(wrapper.getCachedTool("tool-b")); } - // ==================== setClient Tests ==================== - - @Test - void testSetClient_ReplacesClient() { - // Wrapper created with original client - assertNotNull(wrapper); - assertEquals("test-sync-client", wrapper.getName()); - - // Create a new client - McpSyncClient newClient = mock(McpSyncClient.class); - assertNotSame(mockClient, newClient); - - // Replace client using reflection - invokeSetClient(newClient); - - // Verify by testing initialization with new client - McpSchema.Implementation serverInfo = - new McpSchema.Implementation("NewServer", "New Server", "1.0"); - McpSchema.InitializeResult initResult = - new McpSchema.InitializeResult( - "1.0", - McpSchema.ServerCapabilities.builder().build(), - serverInfo, - null, - null); - - when(newClient.initialize()).thenReturn(initResult); - when(newClient.listTools()).thenReturn(new McpSchema.ListToolsResult(List.of(), null)); - - wrapper.initialize().block(); - - // Verify new client was used - verify(newClient, times(1)).initialize(); - verify(mockClient, times(0)).initialize(); // Original client never called - } - // ==================== close() Idempotency Tests ==================== @Test @@ -526,68 +480,9 @@ void testClose_WithoutInitialize() { assertTrue(wrapper.cachedTools.isEmpty()); } - @Test - void testClose_AfterSetClientToNull() { - // First initialize normally - setupSuccessfulInitialization(); - wrapper.initialize().block(); - assertTrue(wrapper.isInitialized()); - - // Manually set client to null via reflection - invokeSetClient(null); - - // Close should handle gracefully - wrapper.close(); - - assertFalse(wrapper.isInitialized()); - assertTrue(wrapper.cachedTools.isEmpty()); - } - // ==================== Null Client Error Path Tests ==================== - @Test - void testInitialize_WhenClientIsNull() { - // Create wrapper with null client - McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null); - - // Attempt to initialize should fail with "not available" error - Exception exception = - assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block()); - - assertTrue(exception.getMessage().contains("not available")); - } - - @Test - void testListTools_WhenClientIsNullAfterSet() { - setupSuccessfulInitialization(); - wrapper.initialize().block(); - - // Set client to null via reflection - invokeSetClient(null); - - // Attempt to list tools should fail with "not available" error - Exception exception = - assertThrows(IllegalStateException.class, () -> wrapper.listTools().block()); - - assertTrue(exception.getMessage().contains("not available")); - } - - @Test - void testCallTool_WhenClientIsNullAfterSet() { - setupSuccessfulInitialization(); - wrapper.initialize().block(); - - // Set client to null via reflection - invokeSetClient(null); - - // Attempt to call tool should fail with "not available" error - Exception exception = - assertThrows( - IllegalStateException.class, - () -> wrapper.callTool("test-tool", Map.of()).block()); - - assertTrue(exception.getMessage().contains("not available")); - } + // Removed: client is now final and cannot be null after construction // ==================== Helper Methods ==================== @@ -607,22 +502,6 @@ private void invokeUpdateCachedTools(List tools) { } } - /** - * Invokes the package-private setClient method using reflection. - * - * @param client the MCP sync client to set - */ - private void invokeSetClient(McpSyncClient client) { - try { - java.lang.reflect.Method method = - McpSyncClientWrapper.class.getDeclaredMethod("setClient", McpSyncClient.class); - method.setAccessible(true); - method.invoke(wrapper, client); - } catch (Exception e) { - throw new RuntimeException("Failed to invoke setClient", e); - } - } - private void setupSuccessfulInitialization() { McpSchema.Implementation serverInfo = new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT"); From e72373d5b2b7fce9d67af7c8bae47ff1ad25c856 Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Mon, 26 Jan 2026 14:57:21 +0800 Subject: [PATCH 8/9] fix higress --- .../higress/HigressMcpClientWrapper.java | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java index 3c7a7d778..d27038389 100644 --- a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java +++ b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,11 @@ public class HigressMcpClientWrapper extends McpClientWrapper { */ private final McpClientWrapper delegateClient; + /** + * Flag indicating whether this client has been closed. + */ + private final AtomicBoolean closed = new AtomicBoolean(false); + /** * Whether x_higress_tool_search is enabled. */ @@ -117,6 +123,10 @@ public class HigressMcpClientWrapper extends McpClientWrapper { */ @Override public Mono initialize() { + if (closed.get()) { + return Mono.error( + new IllegalStateException("Higress MCP client '" + name + "' is closed")); + } if (isInitialized()) { logger.debug("Higress MCP client '{}' already initialized", name); return Mono.empty(); @@ -128,7 +138,9 @@ public Mono initialize() { .initialize() .doOnSuccess( unused -> { - this.initialized = true; + if (!closed.get()) { + this.initialized = true; + } logger.info("Higress MCP client '{}' initialized successfully", name); }) .doOnError( @@ -156,6 +168,10 @@ public Mono initialize() { */ @Override public Mono> listTools() { + if (closed.get()) { + return Mono.error( + new IllegalStateException("Higress MCP client '" + name + "' is closed")); + } if (enableToolSearch) { // Call x_higress_tool_search and convert results to McpSchema.Tool logger.info( @@ -271,6 +287,10 @@ private McpSchema.Tool convertToMcpTool(HigressToolSearchResult.ToolInfo toolInf */ @Override public Mono callTool(String toolName, Map arguments) { + if (closed.get()) { + return Mono.error( + new IllegalStateException("Higress MCP client '" + name + "' is closed")); + } logger.debug( "Calling tool '{}' on Higress MCP client '{}' with arguments: {}", toolName, @@ -305,19 +325,23 @@ public Mono callTool(String toolName, Map(); } - - this.initialized = false; - this.cachedTools = new ConcurrentHashMap<>(); } /** From a7aecbc4a6b6b9a77e6425b6308e0fdfb7c8b2c0 Mon Sep 17 00:00:00 2001 From: JGoP-L <741047428@qq.com> Date: Tue, 10 Feb 2026 12:22:31 +0800 Subject: [PATCH 9/9] fix windows CI --- .github/workflows/maven-ci.yml | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml index 9654c1cb6..24e2f041f 100644 --- a/.github/workflows/maven-ci.yml +++ b/.github/workflows/maven-ci.yml @@ -48,10 +48,8 @@ jobs: include: - os: ubuntu-latest repo-cache-path: '~/.m2/repository' - mvnd-cache-path: '~/.mvnd' - os: windows-latest repo-cache-path: '~\.m2\repository' - mvnd-cache-path: '~\.mvnd' env: MVND_VERSION: '1.0.3' @@ -74,33 +72,21 @@ jobs: restore-keys: | maven-${{ runner.os }}- - - name: Cache mvnd [${{ runner.os }}] - id: cache-mvnd + - name: Cache mvnd [Linux] uses: actions/cache@v4 + if: runner.os == 'Linux' with: - path: ${{ matrix.mvnd-cache-path }} - key: mvnd-${{ env.MVND_VERSION }}-${{ runner.os }} + path: ~/.mvnd + key: mvnd-${{ env.MVND_VERSION }}-linux - name: Install mvnd [Linux] - if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Linux' + if: runner.os == 'Linux' shell: bash run: | curl -fsSL https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64.tar.gz | tar xz mkdir -p ~/.mvnd mv maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64/* ~/.mvnd/ - - name: Install mvnd [Windows] - if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Windows' - shell: pwsh - run: | - Invoke-WebRequest ` - -Uri "https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64.zip" ` - -OutFile "mvnd.zip" - mkdir -Path "~\.mvnd" -Force - Expand-Archive -Path "mvnd.zip" -DestinationPath "mvnd" -FORCE - Copy-Item -Path "mvnd\maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64\*" -Destination "~\.mvnd\" -Recurse - Remove-Item -Path "mvnd", "mvnd.zip" -Recurse -Force - - name: Build and Test with Coverage [Linux] if: runner.os == 'Linux' run: | @@ -109,8 +95,9 @@ jobs: - name: Build and Test with Coverage [Windows] if: runner.os == 'Windows' + shell: pwsh run: | - & "~\.mvnd\bin\mvnd.cmd" -B clean verify + mvn -B clean verify - name: Upload coverage reports to Codecov if: runner.os == 'Linux'