From 3ef7d5d5d5e110d86264b7e5f187243ecaa5357d Mon Sep 17 00:00:00 2001 From: shaojie <741047428@qq.com> Date: Thu, 15 Jan 2026 09:45:14 +0800 Subject: [PATCH 1/9] feat(mcp): add notification handlers support for async client --- .../core/tool/mcp/McpAsyncClientWrapper.java | 63 ++++- .../core/tool/mcp/McpClientBuilder.java | 70 ++++- .../core/tool/mcp/McpSyncClientWrapper.java | 43 ++- .../tool/mcp/McpAsyncClientWrapperTest.java | 260 ++++++++++++++++++ 4 files changed, 409 insertions(+), 27 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index f67968a20..36361cade 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -19,17 +19,16 @@ import io.modelcontextprotocol.spec.McpSchema; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** - * Wrapper for asynchronous MCP clients using Project Reactor. - * This implementation delegates to {@link McpAsyncClient} and provides - * reactive operations that return Mono types. + * Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to + * {@link McpAsyncClient} and provides reactive operations that return Mono types. * - *
Example usage: - *
{@code
+ * Example usage:
{@code
* McpAsyncClient client = ... // created via McpClient.async()
* McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
* wrapper.initialize()
@@ -41,7 +40,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);
- private final McpAsyncClient client;
+ private final AtomicReference clientRef;
/**
* Constructs a new asynchronous MCP client wrapper.
@@ -51,7 +50,32 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
- this.client = client;
+ this.clientRef = new AtomicReference<>(client);
+ }
+
+ /**
+ * Sets the underlying MCP async client. This is called by McpClientBuilder after the client
+ * is created with notification handlers.
+ *
+ * @param client the MCP async client
+ */
+ void setClient(McpAsyncClient client) {
+ this.clientRef.set(client);
+ }
+
+ /**
+ * Updates the cached tools map with new tools from the server. This method is called when the
+ * server sends a tools/list_changed notification.
+ *
+ * @param tools the new list of tools from the server (empty list clears cache)
+ */
+ void updateCachedTools(List tools) {
+ if (tools != null) {
+ // Clear and rebuild cache
+ cachedTools.clear();
+ tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
+ }
}
/**
@@ -68,6 +92,12 @@ public Mono initialize() {
return Mono.empty();
}
+ McpAsyncClient client = clientRef.get();
+ if (client == null) {
+ return Mono.error(
+ new IllegalStateException("McpAsyncClient not set. Call setClient() first."));
+ }
+
logger.info("Initializing MCP async client: {}", name);
return client.initialize()
@@ -99,7 +129,6 @@ public Mono initialize() {
* initialized before calling this method.
*
* @return a Mono emitting the list of available tools
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono> listTools() {
@@ -108,6 +137,11 @@ public Mono> listTools() {
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
+ McpAsyncClient client = clientRef.get();
+ if (client == null) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
+ }
+
return client.listTools().map(McpSchema.ListToolsResult::tools);
}
@@ -120,7 +154,6 @@ public Mono> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono callTool(String toolName, Map arguments) {
@@ -129,6 +162,11 @@ public Mono callTool(String toolName, Map callTool(String toolName, Map logger.debug("MCP client '{}' closed", name))
.doOnError(e -> logger.error("Error closing MCP client '{}'", name, e))
.block();
} catch (Exception e) {
logger.error("Exception during MCP client close", e);
- client.close();
+ toClose.close();
}
}
initialized = false;
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index 67097097b..de03eb95d 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -36,6 +36,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
/**
@@ -76,6 +78,7 @@ public class McpClientBuilder {
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120);
private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30);
+ private static final Logger logger = LoggerFactory.getLogger(McpClientBuilder.class);
private final String name;
private TransportConfig transportConfig;
@@ -233,6 +236,10 @@ public McpClientBuilder initializationTimeout(Duration timeout) {
/**
* Builds an asynchronous MCP client wrapper.
*
+ * This method uses a two-phase build pattern to support notification handlers.
+ * The wrapper is created first, then the MCP client is built with notification consumers
+ * that can reference the wrapper.
+ *
* @return Mono emitting the async client wrapper
*/
public Mono buildAsync() {
@@ -253,15 +260,76 @@ public Mono buildAsync() {
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();
+ // ========== Phase 1: Create wrapper (client is temporarily null) ==========
+ McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null);
+
+ // ========== Phase 2: Build client (can reference wrapper) ==========
McpAsyncClient mcpClient =
McpClient.async(transport)
.requestTimeout(requestTimeout)
.initializationTimeout(initializationTimeout)
.clientInfo(clientInfo)
.capabilities(clientCapabilities)
+
+ // ----- Log notification Consumer -----
+ .loggingConsumer(
+ notification -> {
+ // Parse notification content
+ String level =
+ notification.level() != null
+ ? notification.level().toString()
+ : "info";
+ String loggerName =
+ notification.logger() != null
+ ? notification.logger()
+ : "mcp";
+ String data =
+ notification.data() != null
+ ? notification.data()
+ : "";
+
+ // Log to SLF4J by level
+ switch (level.toLowerCase()) {
+ case "error" ->
+ logger.error(
+ "[MCP-{}] [{}] {}",
+ name,
+ loggerName,
+ data);
+ case "warning" ->
+ logger.warn(
+ "[MCP-{}] [{}] {}",
+ name,
+ loggerName,
+ data);
+ case "debug" ->
+ logger.debug(
+ "[MCP-{}] [{}] {}",
+ name,
+ loggerName,
+ data);
+ default ->
+ logger.info(
+ "[MCP-{}] [{}] {}",
+ name,
+ loggerName,
+ data);
+ }
+ return Mono.empty();
+ })
+
+ // ----- Tools change notification Consumer -----
+ .toolsChangeConsumer(
+ tools -> {
+ // Call wrapper method to update cache
+ wrapper.updateCachedTools(tools);
+ return Mono.empty();
+ })
.build();
- return new McpAsyncClientWrapper(name, mcpClient);
+ // ========== Phase 3: Link MCP client to wrapper ==========
+ wrapper.setClient(mcpClient);
+ return wrapper;
});
}
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index aaf16f589..b9d75cf8b 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -19,18 +19,18 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
- * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types.
- * This implementation delegates to {@link McpSyncClient} and wraps blocking operations
- * in Reactor's boundedElastic scheduler to avoid blocking the event loop.
+ * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. This
+ * implementation delegates to {@link McpSyncClient} and wraps blocking operations in Reactor's
+ * boundedElastic scheduler to avoid blocking the event loop.
*
- * Example usage:
- *
{@code
+ * Example usage:
{@code
* McpSyncClient client = ... // created via McpClient.sync()
* McpSyncClientWrapper wrapper = new McpSyncClientWrapper("my-mcp", client);
* wrapper.initialize()
@@ -42,7 +42,7 @@ public class McpSyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class);
- private final McpSyncClient client;
+ private final AtomicReference clientRef;
/**
* Constructs a new synchronous MCP client wrapper.
@@ -52,7 +52,7 @@ public class McpSyncClientWrapper extends McpClientWrapper {
*/
public McpSyncClientWrapper(String name, McpSyncClient client) {
super(name);
- this.client = client;
+ this.clientRef = new AtomicReference<>(client);
}
/**
@@ -70,10 +70,16 @@ public Mono initialize() {
return Mono.empty();
}
- logger.info("Initializing MCP sync client: {}", name);
-
return Mono.fromCallable(
() -> {
+ McpSyncClient client = clientRef.get();
+ if (client == null) {
+ throw new IllegalStateException(
+ "McpSyncClient not set. Call setClient() first.");
+ }
+
+ logger.info("Initializing MCP sync client: {}", name);
+
// Initialize the client (blocking)
McpSchema.InitializeResult result = client.initialize();
logger.debug(
@@ -105,7 +111,6 @@ public Mono initialize() {
* must be initialized before calling this method.
*
* @return a Mono emitting the list of available tools
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono> listTools() {
@@ -114,6 +119,11 @@ public Mono> listTools() {
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
+ McpSyncClient client = clientRef.get();
+ if (client == null) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
+ }
+
return Mono.fromCallable(() -> client.listTools().tools())
.subscribeOn(Schedulers.boundedElastic());
}
@@ -127,7 +137,6 @@ public Mono> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
- * @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono callTool(String toolName, Map arguments) {
@@ -136,6 +145,11 @@ public Mono callTool(String toolName, Map callTool(String toolName, Map tools) {
+ try {
+ java.lang.reflect.Method method =
+ McpAsyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, tools);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke updateCachedTools", e);
+ }
+ }
+
+ /**
+ * Invokes the package-private setClient method using reflection.
+ *
+ * @param client the MCP async client to set
+ */
+ private void invokeSetClient(McpAsyncClient client) {
+ try {
+ java.lang.reflect.Method method =
+ McpAsyncClientWrapper.class.getDeclaredMethod(
+ "setClient", McpAsyncClient.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, client);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke setClient", e);
+ }
+ }
+
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.8-SNAPSHOT");
From ae062230b1a71eebda34f5182c90aa143bd100a3 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Thu, 15 Jan 2026 10:13:37 +0800
Subject: [PATCH 2/9] feat(mcp): add notification handlers support for sync
client
---
.../core/tool/mcp/McpAsyncClientWrapper.java | 10 +-
.../core/tool/mcp/McpClientBuilder.java | 50 +++-
.../core/tool/mcp/McpSyncClientWrapper.java | 31 ++-
.../tool/mcp/McpSyncClientWrapperTest.java | 254 ++++++++++++++++++
4 files changed, 339 insertions(+), 6 deletions(-)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index 36361cade..5f15fd155 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
@@ -71,9 +72,11 @@ void setClient(McpAsyncClient client) {
*/
void updateCachedTools(List tools) {
if (tools != null) {
- // Clear and rebuild cache
+ // Build new map first, then atomically replace
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools.clear();
- tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ cachedTools.putAll(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
}
@@ -94,8 +97,7 @@ public Mono initialize() {
McpAsyncClient client = clientRef.get();
if (client == null) {
- return Mono.error(
- new IllegalStateException("McpAsyncClient not set. Call setClient() first."));
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
}
logger.info("Initializing MCP async client: {}", name);
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index d89b1821a..93238c0fd 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -386,6 +386,10 @@ public Mono buildAsync() {
/**
* Builds a synchronous MCP client wrapper (blocking operations).
*
+ * This method uses a two-phase build pattern to support notification handlers. The wrapper
+ * is created first, then the MCP client is built with notification consumers that can
+ * reference the wrapper.
+ *
* @return synchronous client wrapper
*/
public McpClientWrapper buildSync() {
@@ -402,15 +406,59 @@ public McpClientWrapper buildSync() {
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();
+ // ========== Phase 1: Create wrapper (client is temporarily null) ==========
+ McpSyncClientWrapper wrapper = new McpSyncClientWrapper(name, null);
+
+ // ========== Phase 2: Build client (can reference wrapper) ==========
McpSyncClient mcpClient =
McpClient.sync(transport)
.requestTimeout(requestTimeout)
.initializationTimeout(initializationTimeout)
.clientInfo(clientInfo)
.capabilities(clientCapabilities)
+ // ----- Log notification Consumer -----
+ .loggingConsumer(
+ notification -> {
+ // Parse notification content
+ String level =
+ notification.level() != null
+ ? notification.level().toString()
+ : "info";
+ String loggerName =
+ notification.logger() != null
+ ? notification.logger()
+ : "mcp";
+ String data =
+ notification.data() != null ? notification.data() : "";
+
+ // Log to SLF4J by level
+ switch (level.toLowerCase()) {
+ case "error" ->
+ logger.error(
+ "[MCP-{}] [{}] {}", name, loggerName, data);
+ case "warning" ->
+ logger.warn(
+ "[MCP-{}] [{}] {}", name, loggerName, data);
+ case "debug" ->
+ logger.debug(
+ "[MCP-{}] [{}] {}", name, loggerName, data);
+ default ->
+ logger.info(
+ "[MCP-{}] [{}] {}", name, loggerName, data);
+ }
+ })
+
+ // ----- Tools change notification Consumer -----
+ .toolsChangeConsumer(
+ tools -> {
+ // Call wrapper method to update cache
+ wrapper.updateCachedTools(tools);
+ })
.build();
- return new McpSyncClientWrapper(name, mcpClient);
+ // ========== Phase 3: Link MCP client to wrapper ==========
+ wrapper.setClient(mcpClient);
+ return wrapper;
}
// ==================== Internal Transport Configuration Classes ====================
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index b9d75cf8b..fc9161877 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
@@ -55,6 +56,34 @@ public McpSyncClientWrapper(String name, McpSyncClient client) {
this.clientRef = new AtomicReference<>(client);
}
+ /**
+ * Sets the underlying MCP sync client. This is called by McpClientBuilder after the client is
+ * created with notification handlers.
+ *
+ * @param client the MCP sync client
+ */
+ void setClient(McpSyncClient client) {
+ this.clientRef.set(client);
+ }
+
+ /**
+ * Updates the cached tools map with new tools from the server. This method is called when the
+ * server sends a tools/list_changed notification. This method is thread-safe and can be
+ * called concurrently from notification handlers.
+ *
+ * @param tools the new list of tools from the server (empty list clears cache)
+ */
+ void updateCachedTools(List tools) {
+ if (tools != null) {
+ // Build new map first, then atomically replace
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools.clear();
+ cachedTools.putAll(newTools);
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
+ }
+ }
+
/**
* Initializes the sync MCP client connection and caches available tools.
*
@@ -75,7 +104,7 @@ public Mono initialize() {
McpSyncClient client = clientRef.get();
if (client == null) {
throw new IllegalStateException(
- "McpSyncClient not set. Call setClient() first.");
+ "MCP client '" + name + "' not available");
}
logger.info("Initializing MCP sync client: {}", name);
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
index b6cbcbcd5..973c4d526 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
@@ -18,6 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -323,6 +324,259 @@ void testCallTool_WithNullArguments() {
assertFalse(Boolean.TRUE.equals(result.isError()));
}
+ // ==================== updateCachedTools Tests ====================
+
+ @Test
+ void testUpdateCachedTools_AddsNewTools() {
+ // Initially cache is empty
+ assertTrue(wrapper.cachedTools.isEmpty());
+
+ // Create mock tools
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "new-tool-1",
+ null,
+ "New tool 1",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "new-tool-2",
+ null,
+ "New tool 2",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ // Call updateCachedTools (package-private, use reflection)
+ invokeUpdateCachedTools(List.of(tool1, tool2));
+
+ // Verify cache was updated
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("new-tool-1"));
+ assertNotNull(wrapper.getCachedTool("new-tool-2"));
+ assertEquals("New tool 1", wrapper.getCachedTool("new-tool-1").description());
+ assertEquals("New tool 2", wrapper.getCachedTool("new-tool-2").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_EmptyListClearsCache() {
+ // First add some tools to cache
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Then simulate server sending empty tool list (all tools removed)
+ invokeUpdateCachedTools(List.of());
+
+ // Verify cache was cleared
+ assertTrue(wrapper.cachedTools.isEmpty());
+ assertNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_NullDoesNothing() {
+ // Add some tools to cache first
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ int initialSize = wrapper.cachedTools.size();
+
+ // Call with null should not modify cache
+ invokeUpdateCachedTools(null);
+
+ // Verify cache unchanged
+ assertEquals(initialSize, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ }
+
+ @Test
+ void testUpdateCachedTools_ReplacesExistingTools() {
+ // Initialize with existing tools
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Simulate server sending updated tool list
+ McpSchema.Tool updatedTool =
+ new McpSchema.Tool(
+ "tool1",
+ null,
+ "Updated description",
+ new McpSchema.JsonSchema("string", null, null, null, null, null),
+ null,
+ null,
+ null);
+ McpSchema.Tool newTool =
+ new McpSchema.Tool(
+ "new-tool",
+ null,
+ "Brand new tool",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+
+ invokeUpdateCachedTools(List.of(updatedTool, newTool));
+
+ // Verify cache was updated (not appended)
+ assertEquals(2, wrapper.cachedTools.size());
+ assertNotNull(wrapper.getCachedTool("tool1"));
+ assertEquals("Updated description", wrapper.getCachedTool("tool1").description());
+ assertNotNull(wrapper.getCachedTool("new-tool"));
+ assertEquals("Brand new tool", wrapper.getCachedTool("new-tool").description());
+ }
+
+ @Test
+ void testUpdateCachedTools_MultipleTimes() {
+ // First update
+ McpSchema.Tool tool1 =
+ new McpSchema.Tool(
+ "tool-a",
+ null,
+ "Tool A",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool1));
+ assertEquals(1, wrapper.cachedTools.size());
+
+ // Second update - replace
+ McpSchema.Tool tool2 =
+ new McpSchema.Tool(
+ "tool-b",
+ null,
+ "Tool B",
+ new McpSchema.JsonSchema("object", null, null, null, null, null),
+ null,
+ null,
+ null);
+ invokeUpdateCachedTools(List.of(tool2));
+
+ // Verify only latest tools exist
+ assertEquals(1, wrapper.cachedTools.size());
+ assertNull(wrapper.getCachedTool("tool-a"));
+ assertNotNull(wrapper.getCachedTool("tool-b"));
+ }
+
+ // ==================== setClient Tests ====================
+
+ @Test
+ void testSetClient_ReplacesClient() {
+ // Wrapper created with original client
+ assertNotNull(wrapper);
+ assertEquals("test-sync-client", wrapper.getName());
+
+ // Create a new client
+ McpSyncClient newClient = mock(McpSyncClient.class);
+ assertNotSame(mockClient, newClient);
+
+ // Replace client using reflection
+ invokeSetClient(newClient);
+
+ // Verify by testing initialization with new client
+ McpSchema.Implementation serverInfo =
+ new McpSchema.Implementation("NewServer", "New Server", "1.0");
+ McpSchema.InitializeResult initResult =
+ new McpSchema.InitializeResult(
+ "1.0",
+ McpSchema.ServerCapabilities.builder().build(),
+ serverInfo,
+ null,
+ null);
+
+ when(newClient.initialize()).thenReturn(initResult);
+ when(newClient.listTools()).thenReturn(new McpSchema.ListToolsResult(List.of(), null));
+
+ wrapper.initialize().block();
+
+ // Verify new client was used
+ verify(newClient, times(1)).initialize();
+ verify(mockClient, times(0)).initialize(); // Original client never called
+ }
+
+ // ==================== close() Idempotency Tests ====================
+
+ @Test
+ void testClose_Idempotent() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertTrue(wrapper.isInitialized());
+
+ // Close multiple times
+ wrapper.close();
+ wrapper.close();
+ wrapper.close();
+
+ // Verify closeGracefully was called only once
+ verify(mockClient, times(1)).closeGracefully();
+ assertFalse(wrapper.isInitialized());
+ }
+
+ @Test
+ void testClose_WithoutInitialize() {
+ // Close without ever initializing
+ wrapper.close();
+
+ // Should not throw, client should be nullified
+ assertFalse(wrapper.isInitialized());
+ assertTrue(wrapper.cachedTools.isEmpty());
+ }
+
+ @Test
+ void testClose_AfterSetClientToNull() {
+ // First initialize normally
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+ assertTrue(wrapper.isInitialized());
+
+ // Manually set client to null via reflection
+ invokeSetClient(null);
+
+ // Close should handle gracefully
+ wrapper.close();
+
+ assertFalse(wrapper.isInitialized());
+ assertTrue(wrapper.cachedTools.isEmpty());
+ }
+
+ // ==================== Helper Methods ====================
+
+ /**
+ * Invokes the package-private updateCachedTools method using reflection.
+ *
+ * @param tools the list of tools to update
+ */
+ private void invokeUpdateCachedTools(List tools) {
+ try {
+ java.lang.reflect.Method method =
+ McpSyncClientWrapper.class.getDeclaredMethod("updateCachedTools", List.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, tools);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke updateCachedTools", e);
+ }
+ }
+
+ /**
+ * Invokes the package-private setClient method using reflection.
+ *
+ * @param client the MCP sync client to set
+ */
+ private void invokeSetClient(McpSyncClient client) {
+ try {
+ java.lang.reflect.Method method =
+ McpSyncClientWrapper.class.getDeclaredMethod("setClient", McpSyncClient.class);
+ method.setAccessible(true);
+ method.invoke(wrapper, client);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke setClient", e);
+ }
+ }
+
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.8-SNAPSHOT");
From b966d99e65f9445ebefc5e2b4d5f3ec9d82b1da4 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Thu, 15 Jan 2026 10:25:30 +0800
Subject: [PATCH 3/9] feat(mcp): add notification handlers support for sync
client
---
.../tool/mcp/McpAsyncClientWrapperTest.java | 47 +++++++++++++++++++
.../tool/mcp/McpSyncClientWrapperTest.java | 46 ++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
index 6ad759a62..950e43c39 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
@@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -428,6 +429,52 @@ void testClose_AfterSetClientToNull() {
assertTrue(wrapper.cachedTools.isEmpty());
}
+ // ==================== Null Client Error Path Tests ====================
+
+ @Test
+ void testInitialize_WhenClientIsNull() {
+ // Create wrapper with null client
+ McpAsyncClientWrapper nullWrapper = new McpAsyncClientWrapper("null-client", null);
+
+ // Attempt to initialize should fail with "not available" error
+ Exception exception =
+ assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
+ @Test
+ void testListTools_WhenClientIsNullAfterSet() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+
+ // Set client to null via reflection
+ invokeSetClient(null);
+
+ // Attempt to list tools should fail with "not available" error
+ Exception exception =
+ assertThrows(IllegalStateException.class, () -> wrapper.listTools().block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
+ @Test
+ void testCallTool_WhenClientIsNullAfterSet() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+
+ // Set client to null via reflection
+ invokeSetClient(null);
+
+ // Attempt to call tool should fail with "not available" error
+ Exception exception =
+ assertThrows(
+ IllegalStateException.class,
+ () -> wrapper.callTool("test-tool", Map.of()).block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
// ==================== Helper Methods ====================
/**
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
index 973c4d526..ed3dd53ef 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
@@ -543,6 +543,52 @@ void testClose_AfterSetClientToNull() {
assertTrue(wrapper.cachedTools.isEmpty());
}
+ // ==================== Null Client Error Path Tests ====================
+
+ @Test
+ void testInitialize_WhenClientIsNull() {
+ // Create wrapper with null client
+ McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null);
+
+ // Attempt to initialize should fail with "not available" error
+ Exception exception =
+ assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
+ @Test
+ void testListTools_WhenClientIsNullAfterSet() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+
+ // Set client to null via reflection
+ invokeSetClient(null);
+
+ // Attempt to list tools should fail with "not available" error
+ Exception exception =
+ assertThrows(IllegalStateException.class, () -> wrapper.listTools().block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
+ @Test
+ void testCallTool_WhenClientIsNullAfterSet() {
+ setupSuccessfulInitialization();
+ wrapper.initialize().block();
+
+ // Set client to null via reflection
+ invokeSetClient(null);
+
+ // Attempt to call tool should fail with "not available" error
+ Exception exception =
+ assertThrows(
+ IllegalStateException.class,
+ () -> wrapper.callTool("test-tool", Map.of()).block());
+
+ assertTrue(exception.getMessage().contains("not available"));
+ }
+
// ==================== Helper Methods ====================
/**
From 9a23888656ca398c05ad15288f765ccad5e4b0d6 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Thu, 15 Jan 2026 18:44:26 +0800
Subject: [PATCH 4/9] add synchronized
---
.../io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java | 5 +++--
.../io/agentscope/core/tool/mcp/McpSyncClientWrapper.java | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index 5f15fd155..a7fd455f9 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -66,11 +66,12 @@ void setClient(McpAsyncClient client) {
/**
* Updates the cached tools map with new tools from the server. This method is called when the
- * server sends a tools/list_changed notification.
+ * server sends a tools/list_changed notification. This method is thread-safe and can be
+ * called concurrently from notification handlers.
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
- void updateCachedTools(List tools) {
+ synchronized void updateCachedTools(List tools) {
if (tools != null) {
// Build new map first, then atomically replace
Map newTools =
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index fc9161877..40e1c3f7b 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -73,7 +73,7 @@ void setClient(McpSyncClient client) {
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
- void updateCachedTools(List tools) {
+ synchronized void updateCachedTools(List tools) {
if (tools != null) {
// Build new map first, then atomically replace
Map newTools =
From 3e95f85009100df4b80d6f2766b243bbf6c0b75c Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Fri, 16 Jan 2026 13:44:27 +0800
Subject: [PATCH 5/9] add volatile
---
.../core/tool/mcp/McpAsyncClientWrapper.java | 23 ++++++++++++-------
.../core/tool/mcp/McpClientWrapper.java | 2 +-
.../core/tool/mcp/McpSyncClientWrapper.java | 22 ++++++++++++------
.../higress/HigressMcpClientWrapper.java | 22 ++++++++++++++----
4 files changed, 48 insertions(+), 21 deletions(-)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index a7fd455f9..f6565ed9a 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -19,6 +19,7 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -71,13 +72,13 @@ void setClient(McpAsyncClient client) {
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
- synchronized void updateCachedTools(List tools) {
+ void updateCachedTools(List tools) {
if (tools != null) {
- // Build new map first, then atomically replace
+ // Build new map first, then atomically replace via volatile assignment
Map newTools =
- tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
- cachedTools.clear();
- cachedTools.putAll(newTools);
+ tools.stream()
+ .collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
}
@@ -117,8 +118,14 @@ public Mono initialize() {
"MCP client '{}' discovered {} tools",
name,
result.tools().size());
- // Cache all tools
- result.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache all tools - build new map then atomically replace
+ Map newTools =
+ result.tools()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
})
.doOnSuccess(v -> initialized = true)
.doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e))
@@ -216,6 +223,6 @@ public void close() {
}
}
initialized = false;
- cachedTools.clear();
+ cachedTools = new ConcurrentHashMap<>();
}
}
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
index 473af292f..f65286adc 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientWrapper.java
@@ -43,7 +43,7 @@ public abstract class McpClientWrapper implements AutoCloseable {
protected final String name;
/** Cache of tools available from this MCP server */
- protected final Map cachedTools;
+ protected volatile Map cachedTools;
/** Flag indicating whether the client has been initialized */
protected volatile boolean initialized = false;
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index 40e1c3f7b..a0a9d7323 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -19,6 +19,7 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -73,13 +74,13 @@ void setClient(McpSyncClient client) {
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
- synchronized void updateCachedTools(List tools) {
+ void updateCachedTools(List tools) {
if (tools != null) {
- // Build new map first, then atomically replace
+ // Build new map first, then atomically replace via volatile assignment
Map newTools =
- tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
- cachedTools.clear();
- cachedTools.putAll(newTools);
+ tools.stream()
+ .collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
}
@@ -123,7 +124,14 @@ public Mono initialize() {
name,
toolsResult.tools().size());
- toolsResult.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache all tools - build new map then atomically replace
+ Map newTools =
+ toolsResult.tools()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
initialized = true;
return null;
@@ -227,6 +235,6 @@ public void close() {
}
}
initialized = false;
- cachedTools.clear();
+ cachedTools = new ConcurrentHashMap<>();
}
}
diff --git a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
index ad289c628..3c7a7d778 100644
--- a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
+++ b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
@@ -20,6 +20,8 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
@@ -185,8 +187,13 @@ public Mono> listTools() {
})
.doOnNext(
tools -> {
- // Cache tools locally
- tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache tools locally - build new map then atomically replace
+ Map newTools =
+ tools.stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
});
} else {
// Return all tools from delegate
@@ -194,8 +201,13 @@ public Mono> listTools() {
.listTools()
.doOnNext(
tools -> {
- // Cache tools locally
- tools.forEach(tool -> cachedTools.put(tool.name(), tool));
+ // Cache tools locally - build new map then atomically replace
+ Map newTools =
+ tools.stream()
+ .collect(
+ Collectors.toMap(
+ McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
logger.debug(
"Higress MCP client '{}' discovered {} tools",
name,
@@ -305,7 +317,7 @@ public void close() {
}
this.initialized = false;
- this.cachedTools.clear();
+ this.cachedTools = new ConcurrentHashMap<>();
}
/**
From 7c97c5503354d38ba0e6ac8d09f17ce105f74757 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Fri, 16 Jan 2026 13:55:31 +0800
Subject: [PATCH 6/9] fix Spotless
---
.../agentscope/core/tool/mcp/McpAsyncClientWrapper.java | 9 +++------
.../agentscope/core/tool/mcp/McpSyncClientWrapper.java | 9 +++------
2 files changed, 6 insertions(+), 12 deletions(-)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index f6565ed9a..3bb566072 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -76,8 +76,7 @@ void updateCachedTools(List tools) {
if (tools != null) {
// Build new map first, then atomically replace via volatile assignment
Map newTools =
- tools.stream()
- .collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
@@ -120,11 +119,9 @@ public Mono initialize() {
result.tools().size());
// Cache all tools - build new map then atomically replace
Map newTools =
- result.tools()
- .stream()
+ result.tools().stream()
.collect(
- Collectors.toMap(
- McpSchema.Tool::name, t -> t));
+ Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
})
.doOnSuccess(v -> initialized = true)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index a0a9d7323..2bd89d7c7 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -78,8 +78,7 @@ void updateCachedTools(List tools) {
if (tools != null) {
// Build new map first, then atomically replace via volatile assignment
Map newTools =
- tools.stream()
- .collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
@@ -126,11 +125,9 @@ public Mono initialize() {
// Cache all tools - build new map then atomically replace
Map newTools =
- toolsResult.tools()
- .stream()
+ toolsResult.tools().stream()
.collect(
- Collectors.toMap(
- McpSchema.Tool::name, t -> t));
+ Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
initialized = true;
From 5733e57fd73c7d65cef4cb1b94b24baba79dde23 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Mon, 26 Jan 2026 14:21:26 +0800
Subject: [PATCH 7/9] fix
---
.../core/tool/mcp/McpAsyncClientWrapper.java | 75 +++---
.../core/tool/mcp/McpClientBuilder.java | 249 ++++++++----------
.../core/tool/mcp/McpSyncClientWrapper.java | 73 +++--
.../tool/mcp/McpAsyncClientWrapperTest.java | 117 +-------
.../tool/mcp/McpSyncClientWrapperTest.java | 123 +--------
5 files changed, 184 insertions(+), 453 deletions(-)
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
index 3bb566072..5c5a94087 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java
@@ -19,8 +19,9 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,8 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);
- private final AtomicReference clientRef;
+ private final McpAsyncClient client;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Constructs a new asynchronous MCP client wrapper.
@@ -52,17 +54,18 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
- this.clientRef = new AtomicReference<>(client);
+ this.client = Objects.requireNonNull(client, "MCP client cannot be null");
}
/**
- * Sets the underlying MCP async client. This is called by McpClientBuilder after the client
- * is created with notification handlers.
+ * Package-private constructor for use by McpClientBuilder.
*
- * @param client the MCP async client
+ * @param name unique identifier for this client
+ * @param builder the builder to construct the client with notification handlers
*/
- void setClient(McpAsyncClient client) {
- this.clientRef.set(client);
+ McpAsyncClientWrapper(String name, McpClientBuilder builder) {
+ super(name);
+ this.client = builder.buildClientForAsync(this);
}
/**
@@ -73,13 +76,14 @@ void setClient(McpAsyncClient client) {
* @param tools the new list of tools from the server (empty list clears cache)
*/
void updateCachedTools(List tools) {
- if (tools != null) {
- // Build new map first, then atomically replace via volatile assignment
- Map newTools =
- tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
- cachedTools = new ConcurrentHashMap<>(newTools);
- logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
+ if (closed.get() || tools == null) {
+ return;
}
+ // Build new map first, then atomically replace via volatile assignment
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
/**
@@ -92,15 +96,13 @@ void updateCachedTools(List tools) {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (initialized) {
return Mono.empty();
}
- McpAsyncClient client = clientRef.get();
- if (client == null) {
- return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
- }
-
logger.info("Initializing MCP async client: {}", name);
return client.initialize()
@@ -124,7 +126,12 @@ public Mono initialize() {
Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
})
- .doOnSuccess(v -> initialized = true)
+ .doOnSuccess(
+ v -> {
+ if (!closed.get()) {
+ initialized = true;
+ }
+ })
.doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e))
.then();
}
@@ -139,16 +146,14 @@ public Mono initialize() {
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
- McpAsyncClient client = clientRef.get();
- if (client == null) {
- return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
- }
-
return client.listTools().map(McpSchema.ListToolsResult::tools);
}
@@ -164,16 +169,14 @@ public Mono> listTools() {
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
- McpAsyncClient client = clientRef.get();
- if (client == null) {
- return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
- }
-
logger.debug("Calling MCP tool '{}' on client '{}'", toolName, name);
McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(toolName, arguments);
@@ -206,20 +209,20 @@ public Mono callTool(String toolName, Map logger.debug("MCP client '{}' closed", name))
.doOnError(e -> logger.error("Error closing MCP client '{}'", name, e))
.block();
} catch (Exception e) {
logger.error("Exception during MCP client close", e);
- toClose.close();
+ client.close();
+ } finally {
+ initialized = false;
+ cachedTools = new ConcurrentHashMap<>();
}
}
- initialized = false;
- cachedTools = new ConcurrentHashMap<>();
}
}
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
index 394abd40c..2b50b7b1d 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpClientBuilder.java
@@ -286,9 +286,8 @@ public McpClientBuilder initializationTimeout(Duration timeout) {
/**
* Builds an asynchronous MCP client wrapper.
*
- * This method uses a two-phase build pattern to support notification handlers.
- * The wrapper is created first, then the MCP client is built with notification consumers
- * that can reference the wrapper.
+ *
This method uses a constructor that passes the builder to the wrapper, allowing the
+ * wrapper to construct its own client with notification handlers.
*
* @return Mono emitting the async client wrapper
*/
@@ -297,96 +296,14 @@ public Mono buildAsync() {
return Mono.error(new IllegalStateException("Transport must be configured"));
}
- return Mono.fromCallable(
- () -> {
- McpClientTransport transport = transportConfig.createTransport();
-
- McpSchema.Implementation clientInfo =
- new McpSchema.Implementation(
- "agentscope-java", "AgentScope Java Framework", "1.0.9-SNAPSHOT");
-
- McpSchema.ClientCapabilities clientCapabilities =
- McpSchema.ClientCapabilities.builder().build();
-
- // ========== Phase 1: Create wrapper (client is temporarily null) ==========
- McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null);
-
- // ========== Phase 2: Build client (can reference wrapper) ==========
- McpAsyncClient mcpClient =
- McpClient.async(transport)
- .requestTimeout(requestTimeout)
- .initializationTimeout(initializationTimeout)
- .clientInfo(clientInfo)
- .capabilities(clientCapabilities)
-
- // ----- Log notification Consumer -----
- .loggingConsumer(
- notification -> {
- // Parse notification content
- String level =
- notification.level() != null
- ? notification.level().toString()
- : "info";
- String loggerName =
- notification.logger() != null
- ? notification.logger()
- : "mcp";
- String data =
- notification.data() != null
- ? notification.data()
- : "";
-
- // Log to SLF4J by level
- switch (level.toLowerCase()) {
- case "error" ->
- logger.error(
- "[MCP-{}] [{}] {}",
- name,
- loggerName,
- data);
- case "warning" ->
- logger.warn(
- "[MCP-{}] [{}] {}",
- name,
- loggerName,
- data);
- case "debug" ->
- logger.debug(
- "[MCP-{}] [{}] {}",
- name,
- loggerName,
- data);
- default ->
- logger.info(
- "[MCP-{}] [{}] {}",
- name,
- loggerName,
- data);
- }
- return Mono.empty();
- })
-
- // ----- Tools change notification Consumer -----
- .toolsChangeConsumer(
- tools -> {
- // Call wrapper method to update cache
- wrapper.updateCachedTools(tools);
- return Mono.empty();
- })
- .build();
-
- // ========== Phase 3: Link MCP client to wrapper ==========
- wrapper.setClient(mcpClient);
- return wrapper;
- });
+ return Mono.fromCallable(() -> new McpAsyncClientWrapper(name, this));
}
/**
* Builds a synchronous MCP client wrapper (blocking operations).
*
- * This method uses a two-phase build pattern to support notification handlers. The wrapper
- * is created first, then the MCP client is built with notification consumers that can
- * reference the wrapper.
+ *
This method uses a constructor that passes the builder to the wrapper, allowing the
+ * wrapper to construct its own client with notification handlers.
*
* @return synchronous client wrapper
*/
@@ -395,68 +312,116 @@ public McpClientWrapper buildSync() {
throw new IllegalStateException("Transport must be configured");
}
+ return new McpSyncClientWrapper(name, this);
+ }
+
+ // ==================== Internal Client Building Methods ====================
+
+ /**
+ * Builds an MCP async client with notification handlers for the specified wrapper.
+ *
+ * @param wrapper the wrapper that will receive notification callbacks
+ * @return the built MCP async client
+ */
+ McpAsyncClient buildClientForAsync(McpAsyncClientWrapper wrapper) {
+ if (transportConfig == null) {
+ throw new IllegalStateException("Transport must be configured");
+ }
+
McpClientTransport transport = transportConfig.createTransport();
McpSchema.Implementation clientInfo =
new McpSchema.Implementation(
- "agentscope-java", "AgentScope Java Framework", "1.0.9-SNAPSHOT");
+ "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT");
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();
- // ========== Phase 1: Create wrapper (client is temporarily null) ==========
- McpSyncClientWrapper wrapper = new McpSyncClientWrapper(name, null);
-
- // ========== Phase 2: Build client (can reference wrapper) ==========
- McpSyncClient mcpClient =
- McpClient.sync(transport)
- .requestTimeout(requestTimeout)
- .initializationTimeout(initializationTimeout)
- .clientInfo(clientInfo)
- .capabilities(clientCapabilities)
- // ----- Log notification Consumer -----
- .loggingConsumer(
- notification -> {
- // Parse notification content
- String level =
- notification.level() != null
- ? notification.level().toString()
- : "info";
- String loggerName =
- notification.logger() != null
- ? notification.logger()
- : "mcp";
- String data =
- notification.data() != null ? notification.data() : "";
-
- // Log to SLF4J by level
- switch (level.toLowerCase()) {
- case "error" ->
- logger.error(
- "[MCP-{}] [{}] {}", name, loggerName, data);
- case "warning" ->
- logger.warn(
- "[MCP-{}] [{}] {}", name, loggerName, data);
- case "debug" ->
- logger.debug(
- "[MCP-{}] [{}] {}", name, loggerName, data);
- default ->
- logger.info(
- "[MCP-{}] [{}] {}", name, loggerName, data);
- }
- })
-
- // ----- Tools change notification Consumer -----
- .toolsChangeConsumer(
- tools -> {
- // Call wrapper method to update cache
- wrapper.updateCachedTools(tools);
- })
- .build();
-
- // ========== Phase 3: Link MCP client to wrapper ==========
- wrapper.setClient(mcpClient);
- return wrapper;
+ return McpClient.async(transport)
+ .requestTimeout(requestTimeout)
+ .initializationTimeout(initializationTimeout)
+ .clientInfo(clientInfo)
+ .capabilities(clientCapabilities)
+ .loggingConsumer(buildAsyncLoggingConsumer())
+ .toolsChangeConsumer(
+ tools -> {
+ wrapper.updateCachedTools(tools);
+ return Mono.empty();
+ })
+ .build();
+ }
+
+ /**
+ * Builds an MCP sync client with notification handlers for the specified wrapper.
+ *
+ * @param wrapper the wrapper that will receive notification callbacks
+ * @return the built MCP sync client
+ */
+ McpSyncClient buildClientForSync(McpSyncClientWrapper wrapper) {
+ if (transportConfig == null) {
+ throw new IllegalStateException("Transport must be configured");
+ }
+
+ McpClientTransport transport = transportConfig.createTransport();
+
+ McpSchema.Implementation clientInfo =
+ new McpSchema.Implementation(
+ "agentscope-java", "AgentScope Java Framework", "1.0.8-SNAPSHOT");
+
+ McpSchema.ClientCapabilities clientCapabilities =
+ McpSchema.ClientCapabilities.builder().build();
+
+ return McpClient.sync(transport)
+ .requestTimeout(requestTimeout)
+ .initializationTimeout(initializationTimeout)
+ .clientInfo(clientInfo)
+ .capabilities(clientCapabilities)
+ .loggingConsumer(buildSyncLoggingConsumer())
+ .toolsChangeConsumer(tools -> wrapper.updateCachedTools(tools))
+ .build();
+ }
+
+ /**
+ * Creates a logging function for async clients that maps MCP log notifications to SLF4J.
+ *
+ * @return the logging function
+ */
+ private java.util.function.Function<
+ io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification, Mono>
+ buildAsyncLoggingConsumer() {
+ return notification -> {
+ logNotification(notification);
+ return Mono.empty();
+ };
+ }
+
+ /**
+ * Creates a logging consumer for sync clients that maps MCP log notifications to SLF4J.
+ *
+ * @return the logging consumer
+ */
+ private Consumer
+ buildSyncLoggingConsumer() {
+ return notification -> logNotification(notification);
+ }
+
+ /**
+ * Logs an MCP notification to SLF4J at the appropriate level.
+ *
+ * @param notification the notification to log
+ */
+ private void logNotification(
+ io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification notification) {
+ String level = notification.level() != null ? notification.level().toString() : "info";
+ String loggerName = notification.logger() != null ? notification.logger() : "mcp";
+ String data = notification.data() != null ? notification.data() : "";
+
+ switch (level.toLowerCase()) {
+ case "error" -> logger.error("[MCP-{}] [{}] {}", name, loggerName, data);
+ case "warning" -> logger.warn("[MCP-{}] [{}] {}", name, loggerName, data);
+ case "debug" -> logger.debug("[MCP-{}] [{}] {}", name, loggerName, data);
+ default -> logger.info("[MCP-{}] [{}] {}", name, loggerName, data);
+ }
}
// ==================== Internal Transport Configuration Classes ====================
diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
index 2bd89d7c7..952da75ca 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpSyncClientWrapper.java
@@ -19,8 +19,9 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,8 @@ public class McpSyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class);
- private final AtomicReference clientRef;
+ private final McpSyncClient client;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Constructs a new synchronous MCP client wrapper.
@@ -54,17 +56,18 @@ public class McpSyncClientWrapper extends McpClientWrapper {
*/
public McpSyncClientWrapper(String name, McpSyncClient client) {
super(name);
- this.clientRef = new AtomicReference<>(client);
+ this.client = Objects.requireNonNull(client, "MCP client cannot be null");
}
/**
- * Sets the underlying MCP sync client. This is called by McpClientBuilder after the client is
- * created with notification handlers.
+ * Package-private constructor for use by McpClientBuilder.
*
- * @param client the MCP sync client
+ * @param name unique identifier for this client
+ * @param builder the builder to construct the client with notification handlers
*/
- void setClient(McpSyncClient client) {
- this.clientRef.set(client);
+ McpSyncClientWrapper(String name, McpClientBuilder builder) {
+ super(name);
+ this.client = builder.buildClientForSync(this);
}
/**
@@ -75,13 +78,14 @@ void setClient(McpSyncClient client) {
* @param tools the new list of tools from the server (empty list clears cache)
*/
void updateCachedTools(List tools) {
- if (tools != null) {
- // Build new map first, then atomically replace via volatile assignment
- Map newTools =
- tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
- cachedTools = new ConcurrentHashMap<>(newTools);
- logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
+ if (closed.get() || tools == null) {
+ return;
}
+ // Build new map first, then atomically replace via volatile assignment
+ Map newTools =
+ tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
+ cachedTools = new ConcurrentHashMap<>(newTools);
+ logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
/**
@@ -95,18 +99,15 @@ void updateCachedTools(List tools) {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (initialized) {
return Mono.empty();
}
return Mono.fromCallable(
() -> {
- McpSyncClient client = clientRef.get();
- if (client == null) {
- throw new IllegalStateException(
- "MCP client '" + name + "' not available");
- }
-
logger.info("Initializing MCP sync client: {}", name);
// Initialize the client (blocking)
@@ -130,7 +131,9 @@ public Mono initialize() {
Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
- initialized = true;
+ if (!closed.get()) {
+ initialized = true;
+ }
return null;
})
.subscribeOn(Schedulers.boundedElastic())
@@ -148,16 +151,14 @@ public Mono initialize() {
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
- McpSyncClient client = clientRef.get();
- if (client == null) {
- return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
- }
-
return Mono.fromCallable(() -> client.listTools().tools())
.subscribeOn(Schedulers.boundedElastic());
}
@@ -174,16 +175,14 @@ public Mono> listTools() {
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
+ }
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
}
- McpSyncClient client = clientRef.get();
- if (client == null) {
- return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
- }
-
logger.debug("Calling MCP tool '{}' on client '{}'", toolName, name);
return Mono.fromCallable(
@@ -220,18 +219,18 @@ public Mono callTool(String toolName, Map();
}
}
- initialized = false;
- cachedTools = new ConcurrentHashMap<>();
}
}
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
index a52536f42..0a86b3c1d 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapperTest.java
@@ -18,9 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -343,43 +341,6 @@ void testUpdateCachedTools_MultipleTimes() {
assertNotNull(wrapper.getCachedTool("tool-b"));
}
- // ==================== setClient Tests ====================
-
- @Test
- void testSetClient_ReplacesClient() {
- // Wrapper created with original client
- assertNotNull(wrapper);
- assertEquals("test-async-client", wrapper.getName());
-
- // Create a new client
- McpAsyncClient newClient = mock(McpAsyncClient.class);
- assertNotSame(mockClient, newClient);
-
- // Replace client using reflection
- invokeSetClient(newClient);
-
- // Verify by testing initialization with new client
- McpSchema.Implementation serverInfo =
- new McpSchema.Implementation("NewServer", "New Server", "1.0");
- McpSchema.InitializeResult initResult =
- new McpSchema.InitializeResult(
- "1.0",
- McpSchema.ServerCapabilities.builder().build(),
- serverInfo,
- null,
- null);
-
- when(newClient.initialize()).thenReturn(Mono.just(initResult));
- when(newClient.listTools())
- .thenReturn(Mono.just(new McpSchema.ListToolsResult(List.of(), null)));
-
- wrapper.initialize().block();
-
- // Verify new client was used
- verify(newClient, times(1)).initialize();
- verify(mockClient, times(0)).initialize(); // Original client never called
- }
-
// ==================== close() Idempotency Tests ====================
@Test
@@ -412,68 +373,9 @@ void testClose_WithoutInitialize() {
assertTrue(wrapper.cachedTools.isEmpty());
}
- @Test
- void testClose_AfterSetClientToNull() {
- // First initialize normally
- setupSuccessfulInitialization();
- wrapper.initialize().block();
- assertTrue(wrapper.isInitialized());
-
- // Manually set client to null via reflection
- invokeSetClient(null);
-
- // Close should handle gracefully
- wrapper.close();
-
- assertFalse(wrapper.isInitialized());
- assertTrue(wrapper.cachedTools.isEmpty());
- }
-
// ==================== Null Client Error Path Tests ====================
- @Test
- void testInitialize_WhenClientIsNull() {
- // Create wrapper with null client
- McpAsyncClientWrapper nullWrapper = new McpAsyncClientWrapper("null-client", null);
-
- // Attempt to initialize should fail with "not available" error
- Exception exception =
- assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
-
- @Test
- void testListTools_WhenClientIsNullAfterSet() {
- setupSuccessfulInitialization();
- wrapper.initialize().block();
-
- // Set client to null via reflection
- invokeSetClient(null);
-
- // Attempt to list tools should fail with "not available" error
- Exception exception =
- assertThrows(IllegalStateException.class, () -> wrapper.listTools().block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
-
- @Test
- void testCallTool_WhenClientIsNullAfterSet() {
- setupSuccessfulInitialization();
- wrapper.initialize().block();
-
- // Set client to null via reflection
- invokeSetClient(null);
-
- // Attempt to call tool should fail with "not available" error
- Exception exception =
- assertThrows(
- IllegalStateException.class,
- () -> wrapper.callTool("test-tool", Map.of()).block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
+ // Removed: client is now final and cannot be null after construction
// ==================== Helper Methods ====================
@@ -493,23 +395,6 @@ private void invokeUpdateCachedTools(List tools) {
}
}
- /**
- * Invokes the package-private setClient method using reflection.
- *
- * @param client the MCP async client to set
- */
- private void invokeSetClient(McpAsyncClient client) {
- try {
- java.lang.reflect.Method method =
- McpAsyncClientWrapper.class.getDeclaredMethod(
- "setClient", McpAsyncClient.class);
- method.setAccessible(true);
- method.invoke(wrapper, client);
- } catch (Exception e) {
- throw new RuntimeException("Failed to invoke setClient", e);
- }
- }
-
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT");
diff --git a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
index a873129a9..117c3cdb9 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/tool/mcp/McpSyncClientWrapperTest.java
@@ -18,7 +18,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -283,15 +282,6 @@ void testClose_GracefulCloseFails() {
verify(mockClient, times(1)).close();
}
- @Test
- void testClose_NullClient() {
- McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null);
-
- nullWrapper.close();
-
- assertFalse(nullWrapper.isInitialized());
- }
-
@Test
void testClose_MultipleCallsSafe() {
setupSuccessfulInitialization();
@@ -462,42 +452,6 @@ void testUpdateCachedTools_MultipleTimes() {
assertNotNull(wrapper.getCachedTool("tool-b"));
}
- // ==================== setClient Tests ====================
-
- @Test
- void testSetClient_ReplacesClient() {
- // Wrapper created with original client
- assertNotNull(wrapper);
- assertEquals("test-sync-client", wrapper.getName());
-
- // Create a new client
- McpSyncClient newClient = mock(McpSyncClient.class);
- assertNotSame(mockClient, newClient);
-
- // Replace client using reflection
- invokeSetClient(newClient);
-
- // Verify by testing initialization with new client
- McpSchema.Implementation serverInfo =
- new McpSchema.Implementation("NewServer", "New Server", "1.0");
- McpSchema.InitializeResult initResult =
- new McpSchema.InitializeResult(
- "1.0",
- McpSchema.ServerCapabilities.builder().build(),
- serverInfo,
- null,
- null);
-
- when(newClient.initialize()).thenReturn(initResult);
- when(newClient.listTools()).thenReturn(new McpSchema.ListToolsResult(List.of(), null));
-
- wrapper.initialize().block();
-
- // Verify new client was used
- verify(newClient, times(1)).initialize();
- verify(mockClient, times(0)).initialize(); // Original client never called
- }
-
// ==================== close() Idempotency Tests ====================
@Test
@@ -526,68 +480,9 @@ void testClose_WithoutInitialize() {
assertTrue(wrapper.cachedTools.isEmpty());
}
- @Test
- void testClose_AfterSetClientToNull() {
- // First initialize normally
- setupSuccessfulInitialization();
- wrapper.initialize().block();
- assertTrue(wrapper.isInitialized());
-
- // Manually set client to null via reflection
- invokeSetClient(null);
-
- // Close should handle gracefully
- wrapper.close();
-
- assertFalse(wrapper.isInitialized());
- assertTrue(wrapper.cachedTools.isEmpty());
- }
-
// ==================== Null Client Error Path Tests ====================
- @Test
- void testInitialize_WhenClientIsNull() {
- // Create wrapper with null client
- McpSyncClientWrapper nullWrapper = new McpSyncClientWrapper("null-client", null);
-
- // Attempt to initialize should fail with "not available" error
- Exception exception =
- assertThrows(IllegalStateException.class, () -> nullWrapper.initialize().block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
-
- @Test
- void testListTools_WhenClientIsNullAfterSet() {
- setupSuccessfulInitialization();
- wrapper.initialize().block();
-
- // Set client to null via reflection
- invokeSetClient(null);
-
- // Attempt to list tools should fail with "not available" error
- Exception exception =
- assertThrows(IllegalStateException.class, () -> wrapper.listTools().block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
-
- @Test
- void testCallTool_WhenClientIsNullAfterSet() {
- setupSuccessfulInitialization();
- wrapper.initialize().block();
-
- // Set client to null via reflection
- invokeSetClient(null);
-
- // Attempt to call tool should fail with "not available" error
- Exception exception =
- assertThrows(
- IllegalStateException.class,
- () -> wrapper.callTool("test-tool", Map.of()).block());
-
- assertTrue(exception.getMessage().contains("not available"));
- }
+ // Removed: client is now final and cannot be null after construction
// ==================== Helper Methods ====================
@@ -607,22 +502,6 @@ private void invokeUpdateCachedTools(List tools) {
}
}
- /**
- * Invokes the package-private setClient method using reflection.
- *
- * @param client the MCP sync client to set
- */
- private void invokeSetClient(McpSyncClient client) {
- try {
- java.lang.reflect.Method method =
- McpSyncClientWrapper.class.getDeclaredMethod("setClient", McpSyncClient.class);
- method.setAccessible(true);
- method.invoke(wrapper, client);
- } catch (Exception e) {
- throw new RuntimeException("Failed to invoke setClient", e);
- }
- }
-
private void setupSuccessfulInitialization() {
McpSchema.Implementation serverInfo =
new McpSchema.Implementation("TestServer", "Test Server", "1.0.9-SNAPSHOT");
From e72373d5b2b7fce9d67af7c8bae47ff1ad25c856 Mon Sep 17 00:00:00 2001
From: shaojie <741047428@qq.com>
Date: Mon, 26 Jan 2026 14:57:21 +0800
Subject: [PATCH 8/9] fix higress
---
.../higress/HigressMcpClientWrapper.java | 40 +++++++++++++++----
1 file changed, 32 insertions(+), 8 deletions(-)
diff --git a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
index 3c7a7d778..d27038389 100644
--- a/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
+++ b/agentscope-extensions/agentscope-extensions-higress/src/main/java/io/agentscope/extensions/higress/HigressMcpClientWrapper.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +71,11 @@ public class HigressMcpClientWrapper extends McpClientWrapper {
*/
private final McpClientWrapper delegateClient;
+ /**
+ * Flag indicating whether this client has been closed.
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
/**
* Whether x_higress_tool_search is enabled.
*/
@@ -117,6 +123,10 @@ public class HigressMcpClientWrapper extends McpClientWrapper {
*/
@Override
public Mono initialize() {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
if (isInitialized()) {
logger.debug("Higress MCP client '{}' already initialized", name);
return Mono.empty();
@@ -128,7 +138,9 @@ public Mono initialize() {
.initialize()
.doOnSuccess(
unused -> {
- this.initialized = true;
+ if (!closed.get()) {
+ this.initialized = true;
+ }
logger.info("Higress MCP client '{}' initialized successfully", name);
})
.doOnError(
@@ -156,6 +168,10 @@ public Mono initialize() {
*/
@Override
public Mono> listTools() {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
if (enableToolSearch) {
// Call x_higress_tool_search and convert results to McpSchema.Tool
logger.info(
@@ -271,6 +287,10 @@ private McpSchema.Tool convertToMcpTool(HigressToolSearchResult.ToolInfo toolInf
*/
@Override
public Mono callTool(String toolName, Map arguments) {
+ if (closed.get()) {
+ return Mono.error(
+ new IllegalStateException("Higress MCP client '" + name + "' is closed"));
+ }
logger.debug(
"Calling tool '{}' on Higress MCP client '{}' with arguments: {}",
toolName,
@@ -305,19 +325,23 @@ public Mono callTool(String toolName, Map();
}
-
- this.initialized = false;
- this.cachedTools = new ConcurrentHashMap<>();
}
/**
From a7aecbc4a6b6b9a77e6425b6308e0fdfb7c8b2c0 Mon Sep 17 00:00:00 2001
From: JGoP-L <741047428@qq.com>
Date: Tue, 10 Feb 2026 12:22:31 +0800
Subject: [PATCH 9/9] fix windows CI
---
.github/workflows/maven-ci.yml | 27 +++++++--------------------
1 file changed, 7 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index 9654c1cb6..24e2f041f 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -48,10 +48,8 @@ jobs:
include:
- os: ubuntu-latest
repo-cache-path: '~/.m2/repository'
- mvnd-cache-path: '~/.mvnd'
- os: windows-latest
repo-cache-path: '~\.m2\repository'
- mvnd-cache-path: '~\.mvnd'
env:
MVND_VERSION: '1.0.3'
@@ -74,33 +72,21 @@ jobs:
restore-keys: |
maven-${{ runner.os }}-
- - name: Cache mvnd [${{ runner.os }}]
- id: cache-mvnd
+ - name: Cache mvnd [Linux]
uses: actions/cache@v4
+ if: runner.os == 'Linux'
with:
- path: ${{ matrix.mvnd-cache-path }}
- key: mvnd-${{ env.MVND_VERSION }}-${{ runner.os }}
+ path: ~/.mvnd
+ key: mvnd-${{ env.MVND_VERSION }}-linux
- name: Install mvnd [Linux]
- if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Linux'
+ if: runner.os == 'Linux'
shell: bash
run: |
curl -fsSL https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64.tar.gz | tar xz
mkdir -p ~/.mvnd
mv maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64/* ~/.mvnd/
- - name: Install mvnd [Windows]
- if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Windows'
- shell: pwsh
- run: |
- Invoke-WebRequest `
- -Uri "https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64.zip" `
- -OutFile "mvnd.zip"
- mkdir -Path "~\.mvnd" -Force
- Expand-Archive -Path "mvnd.zip" -DestinationPath "mvnd" -FORCE
- Copy-Item -Path "mvnd\maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64\*" -Destination "~\.mvnd\" -Recurse
- Remove-Item -Path "mvnd", "mvnd.zip" -Recurse -Force
-
- name: Build and Test with Coverage [Linux]
if: runner.os == 'Linux'
run: |
@@ -109,8 +95,9 @@ jobs:
- name: Build and Test with Coverage [Windows]
if: runner.os == 'Windows'
+ shell: pwsh
run: |
- & "~\.mvnd\bin\mvnd.cmd" -B clean verify
+ mvn -B clean verify
- name: Upload coverage reports to Codecov
if: runner.os == 'Linux'