From 785fe214987847877343b04c4b4cb864ed99ed1c Mon Sep 17 00:00:00 2001 From: xzc Date: Mon, 26 Jan 2026 13:14:45 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BD=BF=E7=94=A8=20GenerateOptions.mergeO?= =?UTF-8?q?ptions=20=E6=96=B9=E6=B3=95=E5=90=88=E5=B9=B6=20options=20?= =?UTF-8?q?=E5=92=8C=20defaultOptions=EF=BC=8C=E8=A7=A3=E5=86=B3=20DashSco?= =?UTF-8?q?peChatModel#defaultOptions=E9=85=8D=E7=BD=AE=E6=97=A0=E6=95=88?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/agentscope/core/model/DashScopeChatModel.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java b/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java index 88e5be749..85828bfd1 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/DashScopeChatModel.java @@ -188,8 +188,8 @@ private Flux streamWithHttpClient( Instant start = Instant.now(); boolean useMultimodal = requiresMultiModalApi(); - // Get effective options - GenerateOptions effectiveOptions = options != null ? options : defaultOptions; + // Merge options with defaultOptions (options takes precedence) + GenerateOptions effectiveOptions = GenerateOptions.mergeOptions(options, defaultOptions); ToolChoice toolChoice = effectiveOptions.getToolChoice(); // Format messages using formatter From 13645979aa072501bc7ddf985e5053ad81eb68e2 Mon Sep 17 00:00:00 2001 From: xzc Date: Mon, 26 Jan 2026 14:06:40 +0800 Subject: [PATCH 2/5] feat(pipeline): add streaming support to FanoutPipeline - Add stream() methods to FanoutPipeline for real-time event streaming - Support both concurrent (merge) and sequential (concat) streaming modes - Add StreamOptions support for event type filtering - Add unit tests for streaming functionality --- .../core/pipeline/FanoutPipeline.java | 127 +++++++++++++++ .../core/pipeline/FanoutPipelineTest.java | 152 ++++++++++++++++++ 2 files changed, 279 insertions(+) diff --git a/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java b/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java index 37b0badf7..ef9fa151a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java +++ b/agentscope-core/src/main/java/io/agentscope/core/pipeline/FanoutPipeline.java @@ -16,6 +16,8 @@ package io.agentscope.core.pipeline; import io.agentscope.core.agent.AgentBase; +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.StreamOptions; import io.agentscope.core.exception.CompositeAgentException; import io.agentscope.core.message.Msg; import java.util.ArrayList; @@ -227,6 +229,131 @@ public String getDescription() { return description; } + // ==================== Streaming API ==================== + + /** + * Stream execution events from all agents with default options. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input) { + return stream(input, StreamOptions.defaults()); + } + + /** + * Stream execution events from all agents with specified options. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input, StreamOptions options) { + return stream(input, options, null); + } + + /** + * Stream execution events from all agents with structured output support. + * + *

Events from multiple agents are merged (concurrent mode) or concatenated + * (sequential mode) based on the pipeline configuration. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of events emitted during execution from all agents + */ + public Flux stream(Msg input, StreamOptions options, Class structuredOutputClass) { + if (agents.isEmpty()) { + return Flux.empty(); + } + + StreamOptions effectiveOptions = options != null ? options : StreamOptions.defaults(); + + return enableConcurrent + ? streamConcurrent(input, effectiveOptions, structuredOutputClass) + : streamSequential(input, effectiveOptions, structuredOutputClass); + } + + /** + * Stream events from all agents concurrently. + * + *

All agents execute in parallel and their events are merged into a single stream. + * Events may arrive interleaved from different agents. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of merged events from all agents + */ + private Flux streamConcurrent( + Msg input, StreamOptions options, Class structuredOutputClass) { + List errors = + Collections.synchronizedList(new ArrayList<>()); + + List> agentFluxes = + agents.stream() + .map( + agent -> { + Flux flux = + structuredOutputClass != null + ? agent.stream( + input, options, structuredOutputClass) + : agent.stream(input, options); + + return flux.subscribeOn(scheduler) + .doOnError( + throwable -> + errors.add( + new CompositeAgentException + .AgentExceptionInfo( + agent.getAgentId(), + agent.getName(), + throwable))) + .onErrorResume(e -> Flux.empty()); + }) + .toList(); + + return Flux.merge(agentFluxes) + .doOnComplete( + () -> { + if (!errors.isEmpty()) { + throw new CompositeAgentException( + "Multiple agent streaming failures occurred", errors); + } + }); + } + + /** + * Stream events from all agents sequentially. + * + *

Agents execute one after another. Events from each agent are emitted + * in order before the next agent starts. + * + * @param input Input message to distribute to all agents + * @param options Stream configuration options + * @param structuredOutputClass The class type for structured output (optional) + * @return Flux of concatenated events from all agents + */ + private Flux streamSequential( + Msg input, StreamOptions options, Class structuredOutputClass) { + List> chain = new ArrayList<>(); + for (AgentBase agent : agents) { + Flux flux = + structuredOutputClass != null + ? agent.stream(input, options, structuredOutputClass) + : agent.stream(input, options); + chain.add(flux); + } + return Flux.concat(chain); + } + @Override public String toString() { return String.format( diff --git a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java index 58ad38a9a..32d917908 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java @@ -23,6 +23,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.agentscope.core.ReActAgent; +import io.agentscope.core.agent.Event; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agent.StreamOptions; import io.agentscope.core.agent.test.MockModel; import io.agentscope.core.agent.test.TestUtils; import io.agentscope.core.exception.CompositeAgentException; @@ -32,6 +35,7 @@ import java.time.Duration; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -213,4 +217,152 @@ private ReActAgent createAgent(String name, MockModel model) { // safety .build(); } + + // ==================== Streaming Tests ==================== + + @Test + @DisplayName("Should stream events from all agents when running concurrently") + void shouldStreamEventsConcurrently() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); + + Msg input = TestUtils.createUserMessage("User", "stream fanout"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Streaming pipeline should produce events"); + assertFalse(events.isEmpty(), "Events should not be empty"); + + // Verify we got AGENT_RESULT events from both agents + long agentResultCount = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count(); + assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent"); + + // Verify models were called + assertEquals(1, model1.getCallCount(), "First model should be invoked once"); + assertEquals(1, model2.getCallCount(), "Second model should be invoked once"); + } + + @Test + @DisplayName("Should stream events sequentially when configured") + void shouldStreamEventsSequentially() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), false); + + Msg input = TestUtils.createUserMessage("User", "sequential stream"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Sequential streaming should return events"); + assertFalse(pipeline.isConcurrentEnabled(), "Pipeline should be sequential"); + + // Verify we got AGENT_RESULT events + long agentResultCount = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count(); + assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent"); + + // Find agent result events and verify order + List agentResults = + events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).toList(); + + assertEquals( + "Agent1", + agentResults.get(0).getMessage().getName(), + "First agent response should lead"); + assertEquals( + "Agent2", + agentResults.get(1).getMessage().getName(), + "Second agent response should follow"); + } + + @Test + @DisplayName("Should stream with custom StreamOptions") + void shouldStreamWithCustomOptions() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); + + // Only stream AGENT_RESULT events + StreamOptions options = + StreamOptions.builder() + .eventTypes(EventType.AGENT_RESULT) + .incremental(true) + .build(); + + Msg input = TestUtils.createUserMessage("User", "options test"); + List events = pipeline.stream(input, options).collectList().block(TIMEOUT); + + assertNotNull(events, "Streaming with options should return events"); + + // All events should be AGENT_RESULT type + boolean allAgentResults = + events.stream().allMatch(e -> e.getType() == EventType.AGENT_RESULT); + assertTrue(allAgentResults, "All events should be AGENT_RESULT"); + } + + @Test + @DisplayName("Should return empty flux for empty pipeline") + void shouldReturnEmptyFluxForEmptyPipeline() { + FanoutPipeline pipeline = new FanoutPipeline(List.of()); + + Msg input = TestUtils.createUserMessage("User", "empty pipeline"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Should return empty list"); + assertTrue(events.isEmpty(), "Empty pipeline should produce no events"); + } + + @Test + @DisplayName("Should stream events through builder-created pipeline") + void shouldStreamEventsViaBuilder() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + ReActAgent agent3 = createAgent("Agent3", model3); + + FanoutPipeline pipeline = + FanoutPipeline.builder() + .addAgent(agent1) + .addAgents(List.of(agent2, agent3)) + .sequential() + .build(); + + Msg input = TestUtils.createUserMessage("User", "builder stream"); + List events = pipeline.stream(input).collectList().block(TIMEOUT); + + assertNotNull(events, "Builder-produced pipeline should stream events"); + + // Verify agent results in order (sequential mode) + List agentNames = + events.stream() + .filter(e -> e.getType() == EventType.AGENT_RESULT) + .map(e -> e.getMessage().getName()) + .toList(); + + assertEquals( + List.of("Agent1", "Agent2", "Agent3"), + agentNames, + "Sequential streaming should maintain insertion order"); + } + + @Test + @DisplayName("Should collect events count correctly in concurrent streaming") + void shouldCollectCorrectEventCountConcurrently() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + ReActAgent agent3 = createAgent("Agent3", model3); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2, agent3)); + + Msg input = TestUtils.createUserMessage("User", "count test"); + + AtomicInteger eventCount = new AtomicInteger(0); + pipeline.stream(input).doOnNext(event -> eventCount.incrementAndGet()).blockLast(TIMEOUT); + + assertTrue( + eventCount.get() >= 3, + "Should have at least 3 AGENT_RESULT events (one per agent)"); + } } From b12414f959cde9d0375e9169e394381960ac2ba7 Mon Sep 17 00:00:00 2001 From: xzc Date: Mon, 26 Jan 2026 14:22:13 +0800 Subject: [PATCH 3/5] Place the knowledge message before the user input message. --- .../agentscope/core/rag/GenericRAGHook.java | 4 +- .../core/pipeline/FanoutPipelineTest.java | 69 +++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java index 3d0b65b93..ed9e46475 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java +++ b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java @@ -145,11 +145,11 @@ private Mono handlePreCall(PreCallEvent event) { if (retrievedDocs == null || retrievedDocs.isEmpty()) { return Mono.just(event); } - List enhancedMessages = new ArrayList<>(); + List enhancedMessages = new ArrayList<>(inputMessages.size() + 1); // Build enhanced messages with knowledge context Msg enhancedMessage = createEnhancedMessages(retrievedDocs); - enhancedMessages.addAll(inputMessages); enhancedMessages.add(enhancedMessage); + enhancedMessages.addAll(inputMessages); event.setInputMessages(enhancedMessages); return Mono.just(event); }) diff --git a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java index 32d917908..383bb459c 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java @@ -365,4 +365,73 @@ void shouldCollectCorrectEventCountConcurrently() { eventCount.get() >= 3, "Should have at least 3 AGENT_RESULT events (one per agent)"); } + + @Test + @DisplayName("Should stream with null options using defaults") + void shouldStreamWithNullOptions() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); + + Msg input = TestUtils.createUserMessage("User", "null options test"); + // Pass null options explicitly to test the null handling branch + List events = pipeline.stream(input, null, null).collectList().block(TIMEOUT); + + assertNotNull(events, "Should handle null options gracefully"); + assertFalse(events.isEmpty(), "Events should not be empty"); + } + + @Test + @DisplayName("Should stream with structured output class in concurrent mode") + void shouldStreamWithStructuredOutputConcurrent() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), true); + + Msg input = TestUtils.createUserMessage("User", "structured output test"); + StreamOptions options = StreamOptions.defaults(); + + // Test with a structured output class + List events = + pipeline.stream(input, options, String.class).collectList().block(TIMEOUT); + + assertNotNull(events, "Should handle structured output class"); + } + + @Test + @DisplayName("Should stream with structured output class in sequential mode") + void shouldStreamWithStructuredOutputSequential() { + ReActAgent agent1 = createAgent("Agent1", model1); + ReActAgent agent2 = createAgent("Agent2", model2); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), false); + + Msg input = TestUtils.createUserMessage("User", "sequential structured test"); + StreamOptions options = StreamOptions.defaults(); + + // Test sequential mode with structured output class + List events = + pipeline.stream(input, options, String.class).collectList().block(TIMEOUT); + + assertNotNull(events, "Should handle structured output in sequential mode"); + } + + @Test + @DisplayName("Should handle streaming errors and collect them") + void shouldHandleStreamingErrors() { + MockModel errorModel = new MockModel("Error response").withError("Streaming error"); + ReActAgent successAgent = createAgent("SuccessAgent", model1); + ReActAgent failingAgent = createAgent("ErrorAgent", errorModel); + + FanoutPipeline pipeline = new FanoutPipeline(List.of(successAgent, failingAgent)); + Msg input = TestUtils.createUserMessage("User", "streaming error test"); + + // The streaming should complete but may throw on complete if errors occurred + assertThrows( + CompositeAgentException.class, + () -> pipeline.stream(input).collectList().block(TIMEOUT), + "Should throw CompositeAgentException when agent streaming fails"); + } } From f54f647a2ff067d30101862144dfc2ad52656693 Mon Sep 17 00:00:00 2001 From: xzc Date: Mon, 9 Feb 2026 17:08:58 +0800 Subject: [PATCH 4/5] Place the knowledge message before the user input message. --- .../core/pipeline/FanoutPipelineTest.java | 69 ------------------- 1 file changed, 69 deletions(-) diff --git a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java index 383bb459c..32d917908 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/pipeline/FanoutPipelineTest.java @@ -365,73 +365,4 @@ void shouldCollectCorrectEventCountConcurrently() { eventCount.get() >= 3, "Should have at least 3 AGENT_RESULT events (one per agent)"); } - - @Test - @DisplayName("Should stream with null options using defaults") - void shouldStreamWithNullOptions() { - ReActAgent agent1 = createAgent("Agent1", model1); - ReActAgent agent2 = createAgent("Agent2", model2); - - FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2)); - - Msg input = TestUtils.createUserMessage("User", "null options test"); - // Pass null options explicitly to test the null handling branch - List events = pipeline.stream(input, null, null).collectList().block(TIMEOUT); - - assertNotNull(events, "Should handle null options gracefully"); - assertFalse(events.isEmpty(), "Events should not be empty"); - } - - @Test - @DisplayName("Should stream with structured output class in concurrent mode") - void shouldStreamWithStructuredOutputConcurrent() { - ReActAgent agent1 = createAgent("Agent1", model1); - ReActAgent agent2 = createAgent("Agent2", model2); - - FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), true); - - Msg input = TestUtils.createUserMessage("User", "structured output test"); - StreamOptions options = StreamOptions.defaults(); - - // Test with a structured output class - List events = - pipeline.stream(input, options, String.class).collectList().block(TIMEOUT); - - assertNotNull(events, "Should handle structured output class"); - } - - @Test - @DisplayName("Should stream with structured output class in sequential mode") - void shouldStreamWithStructuredOutputSequential() { - ReActAgent agent1 = createAgent("Agent1", model1); - ReActAgent agent2 = createAgent("Agent2", model2); - - FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), false); - - Msg input = TestUtils.createUserMessage("User", "sequential structured test"); - StreamOptions options = StreamOptions.defaults(); - - // Test sequential mode with structured output class - List events = - pipeline.stream(input, options, String.class).collectList().block(TIMEOUT); - - assertNotNull(events, "Should handle structured output in sequential mode"); - } - - @Test - @DisplayName("Should handle streaming errors and collect them") - void shouldHandleStreamingErrors() { - MockModel errorModel = new MockModel("Error response").withError("Streaming error"); - ReActAgent successAgent = createAgent("SuccessAgent", model1); - ReActAgent failingAgent = createAgent("ErrorAgent", errorModel); - - FanoutPipeline pipeline = new FanoutPipeline(List.of(successAgent, failingAgent)); - Msg input = TestUtils.createUserMessage("User", "streaming error test"); - - // The streaming should complete but may throw on complete if errors occurred - assertThrows( - CompositeAgentException.class, - () -> pipeline.stream(input).collectList().block(TIMEOUT), - "Should throw CompositeAgentException when agent streaming fails"); - } } From 159a6002a5f2924dd42215dcdfd6647cf3daf610 Mon Sep 17 00:00:00 2001 From: xzc Date: Mon, 26 Jan 2026 14:22:13 +0800 Subject: [PATCH 5/5] Place the knowledge message before the user input message. --- .../src/main/java/io/agentscope/core/rag/GenericRAGHook.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java index 3d0b65b93..ed9e46475 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java +++ b/agentscope-core/src/main/java/io/agentscope/core/rag/GenericRAGHook.java @@ -145,11 +145,11 @@ private Mono handlePreCall(PreCallEvent event) { if (retrievedDocs == null || retrievedDocs.isEmpty()) { return Mono.just(event); } - List enhancedMessages = new ArrayList<>(); + List enhancedMessages = new ArrayList<>(inputMessages.size() + 1); // Build enhanced messages with knowledge context Msg enhancedMessage = createEnhancedMessages(retrievedDocs); - enhancedMessages.addAll(inputMessages); enhancedMessages.add(enhancedMessage); + enhancedMessages.addAll(inputMessages); event.setInputMessages(enhancedMessages); return Mono.just(event); })