Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ private Flux<ChatResponse> streamWithHttpClient(
Instant start = Instant.now();
boolean useMultimodal = requiresMultiModalApi();

// Get effective options
GenerateOptions effectiveOptions = options != null ? options : defaultOptions;
// Merge options with defaultOptions (options takes precedence)
GenerateOptions effectiveOptions = GenerateOptions.mergeOptions(options, defaultOptions);
ToolChoice toolChoice = effectiveOptions.getToolChoice();

// Format messages using formatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.agentscope.core.pipeline;

import io.agentscope.core.agent.AgentBase;
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.exception.CompositeAgentException;
import io.agentscope.core.message.Msg;
import java.util.ArrayList;
Expand Down Expand Up @@ -227,6 +229,131 @@ public String getDescription() {
return description;
}

// ==================== Streaming API ====================

/**
* Stream execution events from all agents with default options.
*
* <p>Events from multiple agents are merged (concurrent mode) or concatenated
* (sequential mode) based on the pipeline configuration.
*
* @param input Input message to distribute to all agents
* @return Flux of events emitted during execution from all agents
*/
public Flux<Event> stream(Msg input) {
return stream(input, StreamOptions.defaults());
}

/**
* Stream execution events from all agents with specified options.
*
* <p>Events from multiple agents are merged (concurrent mode) or concatenated
* (sequential mode) based on the pipeline configuration.
*
* @param input Input message to distribute to all agents
* @param options Stream configuration options
* @return Flux of events emitted during execution from all agents
*/
public Flux<Event> stream(Msg input, StreamOptions options) {
return stream(input, options, null);
}

/**
* Stream execution events from all agents with structured output support.
*
* <p>Events from multiple agents are merged (concurrent mode) or concatenated
* (sequential mode) based on the pipeline configuration.
*
* @param input Input message to distribute to all agents
* @param options Stream configuration options
* @param structuredOutputClass The class type for structured output (optional)
* @return Flux of events emitted during execution from all agents
*/
public Flux<Event> stream(Msg input, StreamOptions options, Class<?> structuredOutputClass) {
if (agents.isEmpty()) {
return Flux.empty();
}

StreamOptions effectiveOptions = options != null ? options : StreamOptions.defaults();

return enableConcurrent
? streamConcurrent(input, effectiveOptions, structuredOutputClass)
: streamSequential(input, effectiveOptions, structuredOutputClass);
}

/**
* Stream events from all agents concurrently.
*
* <p>All agents execute in parallel and their events are merged into a single stream.
* Events may arrive interleaved from different agents.
*
* @param input Input message to distribute to all agents
* @param options Stream configuration options
* @param structuredOutputClass The class type for structured output (optional)
* @return Flux of merged events from all agents
*/
private Flux<Event> streamConcurrent(
Msg input, StreamOptions options, Class<?> structuredOutputClass) {
List<CompositeAgentException.AgentExceptionInfo> errors =
Collections.synchronizedList(new ArrayList<>());

List<Flux<Event>> agentFluxes =
agents.stream()
.map(
agent -> {
Flux<Event> flux =
structuredOutputClass != null
? agent.stream(
input, options, structuredOutputClass)
: agent.stream(input, options);

return flux.subscribeOn(scheduler)
.doOnError(
throwable ->
errors.add(
new CompositeAgentException
.AgentExceptionInfo(
agent.getAgentId(),
agent.getName(),
throwable)))
.onErrorResume(e -> Flux.empty());
})
.toList();

return Flux.merge(agentFluxes)
.doOnComplete(
() -> {
if (!errors.isEmpty()) {
throw new CompositeAgentException(
"Multiple agent streaming failures occurred", errors);
}
});
}

/**
* Stream events from all agents sequentially.
*
* <p>Agents execute one after another. Events from each agent are emitted
* in order before the next agent starts.
*
* @param input Input message to distribute to all agents
* @param options Stream configuration options
* @param structuredOutputClass The class type for structured output (optional)
* @return Flux of concatenated events from all agents
*/
private Flux<Event> streamSequential(
Msg input, StreamOptions options, Class<?> structuredOutputClass) {
List<Flux<Event>> chain = new ArrayList<>();
for (AgentBase agent : agents) {
Flux<Event> flux =
structuredOutputClass != null
? agent.stream(input, options, structuredOutputClass)
: agent.stream(input, options);
chain.add(flux);
}
return Flux.concat(chain);
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ private Mono<PreCallEvent> handlePreCall(PreCallEvent event) {
if (retrievedDocs == null || retrievedDocs.isEmpty()) {
return Mono.just(event);
}
List<Msg> enhancedMessages = new ArrayList<>();
List<Msg> enhancedMessages = new ArrayList<>(inputMessages.size() + 1);
// Build enhanced messages with knowledge context
Msg enhancedMessage = createEnhancedMessages(retrievedDocs);
enhancedMessages.addAll(inputMessages);
enhancedMessages.add(enhancedMessage);
enhancedMessages.addAll(inputMessages);
event.setInputMessages(enhancedMessages);
return Mono.just(event);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.agent.test.MockModel;
import io.agentscope.core.agent.test.TestUtils;
import io.agentscope.core.exception.CompositeAgentException;
Expand All @@ -32,6 +35,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -213,4 +217,152 @@ private ReActAgent createAgent(String name, MockModel model) {
// safety
.build();
}

// ==================== Streaming Tests ====================

@Test
@DisplayName("Should stream events from all agents when running concurrently")
void shouldStreamEventsConcurrently() {
ReActAgent agent1 = createAgent("Agent1", model1);
ReActAgent agent2 = createAgent("Agent2", model2);

FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2));

Msg input = TestUtils.createUserMessage("User", "stream fanout");
List<Event> events = pipeline.stream(input).collectList().block(TIMEOUT);

assertNotNull(events, "Streaming pipeline should produce events");
assertFalse(events.isEmpty(), "Events should not be empty");

// Verify we got AGENT_RESULT events from both agents
long agentResultCount =
events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count();
assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent");

// Verify models were called
assertEquals(1, model1.getCallCount(), "First model should be invoked once");
assertEquals(1, model2.getCallCount(), "Second model should be invoked once");
}

@Test
@DisplayName("Should stream events sequentially when configured")
void shouldStreamEventsSequentially() {
ReActAgent agent1 = createAgent("Agent1", model1);
ReActAgent agent2 = createAgent("Agent2", model2);

FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2), false);

Msg input = TestUtils.createUserMessage("User", "sequential stream");
List<Event> events = pipeline.stream(input).collectList().block(TIMEOUT);

assertNotNull(events, "Sequential streaming should return events");
assertFalse(pipeline.isConcurrentEnabled(), "Pipeline should be sequential");

// Verify we got AGENT_RESULT events
long agentResultCount =
events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).count();
assertEquals(2, agentResultCount, "Expected AGENT_RESULT from each agent");

// Find agent result events and verify order
List<Event> agentResults =
events.stream().filter(e -> e.getType() == EventType.AGENT_RESULT).toList();

assertEquals(
"Agent1",
agentResults.get(0).getMessage().getName(),
"First agent response should lead");
assertEquals(
"Agent2",
agentResults.get(1).getMessage().getName(),
"Second agent response should follow");
}

@Test
@DisplayName("Should stream with custom StreamOptions")
void shouldStreamWithCustomOptions() {
ReActAgent agent1 = createAgent("Agent1", model1);
ReActAgent agent2 = createAgent("Agent2", model2);

FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2));

// Only stream AGENT_RESULT events
StreamOptions options =
StreamOptions.builder()
.eventTypes(EventType.AGENT_RESULT)
.incremental(true)
.build();

Msg input = TestUtils.createUserMessage("User", "options test");
List<Event> events = pipeline.stream(input, options).collectList().block(TIMEOUT);

assertNotNull(events, "Streaming with options should return events");

// All events should be AGENT_RESULT type
boolean allAgentResults =
events.stream().allMatch(e -> e.getType() == EventType.AGENT_RESULT);
assertTrue(allAgentResults, "All events should be AGENT_RESULT");
}

@Test
@DisplayName("Should return empty flux for empty pipeline")
void shouldReturnEmptyFluxForEmptyPipeline() {
FanoutPipeline pipeline = new FanoutPipeline(List.of());

Msg input = TestUtils.createUserMessage("User", "empty pipeline");
List<Event> events = pipeline.stream(input).collectList().block(TIMEOUT);

assertNotNull(events, "Should return empty list");
assertTrue(events.isEmpty(), "Empty pipeline should produce no events");
}

@Test
@DisplayName("Should stream events through builder-created pipeline")
void shouldStreamEventsViaBuilder() {
ReActAgent agent1 = createAgent("Agent1", model1);
ReActAgent agent2 = createAgent("Agent2", model2);
ReActAgent agent3 = createAgent("Agent3", model3);

FanoutPipeline pipeline =
FanoutPipeline.builder()
.addAgent(agent1)
.addAgents(List.of(agent2, agent3))
.sequential()
.build();

Msg input = TestUtils.createUserMessage("User", "builder stream");
List<Event> events = pipeline.stream(input).collectList().block(TIMEOUT);

assertNotNull(events, "Builder-produced pipeline should stream events");

// Verify agent results in order (sequential mode)
List<String> agentNames =
events.stream()
.filter(e -> e.getType() == EventType.AGENT_RESULT)
.map(e -> e.getMessage().getName())
.toList();

assertEquals(
List.of("Agent1", "Agent2", "Agent3"),
agentNames,
"Sequential streaming should maintain insertion order");
}

@Test
@DisplayName("Should collect events count correctly in concurrent streaming")
void shouldCollectCorrectEventCountConcurrently() {
ReActAgent agent1 = createAgent("Agent1", model1);
ReActAgent agent2 = createAgent("Agent2", model2);
ReActAgent agent3 = createAgent("Agent3", model3);

FanoutPipeline pipeline = new FanoutPipeline(List.of(agent1, agent2, agent3));

Msg input = TestUtils.createUserMessage("User", "count test");

AtomicInteger eventCount = new AtomicInteger(0);
pipeline.stream(input).doOnNext(event -> eventCount.incrementAndGet()).blockLast(TIMEOUT);

assertTrue(
eventCount.get() >= 3,
"Should have at least 3 AGENT_RESULT events (one per agent)");
Comment on lines +361 to +366

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current test implementation counts all events, but the assertion message specifically refers to AGENT_RESULT events. This is a bit misleading, and the assertion is not as precise as it could be.

To make the test stronger and clearer, it would be better to filter for AGENT_RESULT events and assert their count directly. This ensures the test is verifying exactly what the description claims.

        long agentResultCount =
                pipeline.stream(input)
                        .filter(event -> event.getType() == EventType.AGENT_RESULT)
                        .count()
                        .block(TIMEOUT);

        assertEquals(3, agentResultCount, "Should have exactly 3 AGENT_RESULT events (one per agent)");

}
}