Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3ef7d5d
feat(mcp): add notification handlers support for async client
JGoP-L Jan 15, 2026
5b9582d
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 15, 2026
ae06223
feat(mcp): add notification handlers support for sync client
JGoP-L Jan 15, 2026
b966d99
feat(mcp): add notification handlers support for sync client
JGoP-L Jan 15, 2026
378b7b1
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 15, 2026
9a23888
add synchronized
JGoP-L Jan 15, 2026
e9df304
Merge remote-tracking branch 'origin/feat/add-notification-handlers-s…
JGoP-L Jan 15, 2026
fa1e155
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 15, 2026
3e95f85
add volatile
JGoP-L Jan 16, 2026
2d086ef
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 16, 2026
7c97c55
fix Spotless
JGoP-L Jan 16, 2026
e99b485
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 16, 2026
e34c3fd
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 20, 2026
5d37bda
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 26, 2026
3839079
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 26, 2026
5733e57
fix
JGoP-L Jan 26, 2026
a8292de
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 26, 2026
e72373d
fix higress
JGoP-L Jan 26, 2026
20724d9
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Jan 27, 2026
38cbc07
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Feb 5, 2026
9e094c1
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Feb 10, 2026
cb18562
Merge branch 'main' into feat/add-notification-handlers-support-for-a…
JGoP-L Feb 10, 2026
a7aecbc
fix windows CI
JGoP-L Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions .github/workflows/maven-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ jobs:
include:
- os: ubuntu-latest
repo-cache-path: '~/.m2/repository'
mvnd-cache-path: '~/.mvnd'
- os: windows-latest
repo-cache-path: '~\.m2\repository'
mvnd-cache-path: '~\.mvnd'

env:
MVND_VERSION: '1.0.3'
Expand All @@ -74,33 +72,21 @@ jobs:
restore-keys: |
maven-${{ runner.os }}-

- name: Cache mvnd [${{ runner.os }}]
id: cache-mvnd
- name: Cache mvnd [Linux]
uses: actions/cache@v4
if: runner.os == 'Linux'
with:
path: ${{ matrix.mvnd-cache-path }}
key: mvnd-${{ env.MVND_VERSION }}-${{ runner.os }}
path: ~/.mvnd
key: mvnd-${{ env.MVND_VERSION }}-linux

- name: Install mvnd [Linux]
if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Linux'
if: runner.os == 'Linux'
shell: bash
run: |
curl -fsSL https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64.tar.gz | tar xz
mkdir -p ~/.mvnd
mv maven-mvnd-${{ env.MVND_VERSION }}-linux-amd64/* ~/.mvnd/

- name: Install mvnd [Windows]
if: steps.cache-mvnd.outputs.cache-hit != 'true' && runner.os == 'Windows'
shell: pwsh
run: |
Invoke-WebRequest `
-Uri "https://github.com/apache/maven-mvnd/releases/download/${{ env.MVND_VERSION }}/maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64.zip" `
-OutFile "mvnd.zip"
mkdir -Path "~\.mvnd" -Force
Expand-Archive -Path "mvnd.zip" -DestinationPath "mvnd" -FORCE
Copy-Item -Path "mvnd\maven-mvnd-${{ env.MVND_VERSION }}-windows-amd64\*" -Destination "~\.mvnd\" -Recurse
Remove-Item -Path "mvnd", "mvnd.zip" -Recurse -Force

- name: Build and Test with Coverage [Linux]
if: runner.os == 'Linux'
run: |
Expand All @@ -109,8 +95,9 @@ jobs:

- name: Build and Test with Coverage [Windows]
if: runner.os == 'Windows'
shell: pwsh
run: |
& "~\.mvnd\bin\mvnd.cmd" -B clean verify
mvn -B clean verify

- name: Upload coverage reports to Codecov
if: runner.os == 'Linux'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
* Wrapper for asynchronous MCP clients using Project Reactor.
* This implementation delegates to {@link McpAsyncClient} and provides
* reactive operations that return Mono types.
* Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to
* {@link McpAsyncClient} and provides reactive operations that return Mono types.
*
* <p>Example usage:
* <pre>{@code
* <p>Example usage: <pre>{@code
* McpAsyncClient client = ... // created via McpClient.async()
* McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
* wrapper.initialize()
Expand All @@ -42,6 +44,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);

private final McpAsyncClient client;
private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Constructs a new asynchronous MCP client wrapper.
Expand All @@ -51,7 +54,36 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
this.client = client;
this.client = Objects.requireNonNull(client, "MCP client cannot be null");
}

/**
* Package-private constructor for use by McpClientBuilder.
*
* @param name unique identifier for this client
* @param builder the builder to construct the client with notification handlers
*/
McpAsyncClientWrapper(String name, McpClientBuilder builder) {
super(name);
this.client = builder.buildClientForAsync(this);
}

/**
* Updates the cached tools map with new tools from the server. This method is called when the
* server sends a tools/list_changed notification. This method is thread-safe and can be
* called concurrently from notification handlers.
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
void updateCachedTools(List<McpSchema.Tool> tools) {
if (closed.get() || tools == null) {
return;
}
// Build new map first, then atomically replace via volatile assignment
Map<String, McpSchema.Tool> newTools =
tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}

/**
Expand All @@ -64,6 +96,9 @@ public McpAsyncClientWrapper(String name, McpAsyncClient client) {
*/
@Override
public Mono<Void> initialize() {
if (closed.get()) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
}
if (initialized) {
return Mono.empty();
}
Expand All @@ -84,10 +119,19 @@ public Mono<Void> initialize() {
"MCP client '{}' discovered {} tools",
name,
result.tools().size());
// Cache all tools
result.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
// Cache all tools - build new map then atomically replace
Map<String, McpSchema.Tool> newTools =
result.tools().stream()
.collect(
Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
})
.doOnSuccess(
v -> {
if (!closed.get()) {
initialized = true;
}
})
.doOnSuccess(v -> initialized = true)
.doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e))
.then();
}
Expand All @@ -99,10 +143,12 @@ public Mono<Void> initialize() {
* initialized before calling this method.
*
* @return a Mono emitting the list of available tools
* @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono<List<McpSchema.Tool>> listTools() {
if (closed.get()) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
}
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
Expand All @@ -120,10 +166,12 @@ public Mono<List<McpSchema.Tool>> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
* @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Object> arguments) {
if (closed.get()) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' is closed"));
}
if (!initialized) {
return Mono.error(
new IllegalStateException("MCP client '" + name + "' not initialized"));
Expand Down Expand Up @@ -161,7 +209,7 @@ public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Obje
*/
@Override
public void close() {
if (client != null) {
if (closed.compareAndSet(false, true)) {
logger.info("Closing MCP async client: {}", name);
try {
client.closeGracefully()
Expand All @@ -171,9 +219,10 @@ public void close() {
} catch (Exception e) {
logger.error("Exception during MCP client close", e);
client.close();
} finally {
initialized = false;
cachedTools = new ConcurrentHashMap<>();
}
}
initialized = false;
cachedTools.clear();
}
}
Loading
Loading