-
Notifications
You must be signed in to change notification settings - Fork 4
Streaming
adham90 edited this page Jan 21, 2026
·
3 revisions
Stream LLM responses in real-time as they're generated, reducing perceived latency for users.
module LLM
class StreamingAgent < ApplicationAgent
model "gpt-4o"
streaming true # Enable streaming for this agent
param :prompt, required: true
def user_prompt
prompt
end
end
end# config/initializers/ruby_llm_agents.rb
RubyLLM::Agents.configure do |config|
config.default_streaming = true
endProcess chunks as they arrive:
LLM::StreamingAgent.call(prompt: "Write a story") do |chunk|
print chunk.content # chunk is a RubyLLM::Chunk object
endOutput appears progressively:
Once... upon... a... time...
For more explicit streaming, use the .stream() class method which forces streaming regardless of class settings:
result = LLM::MyAgent.stream(prompt: "Write a story") do |chunk|
print chunk.content
end
# Access result metadata after streaming
puts "Tokens: #{result.total_tokens}"
puts "TTFT: #{result.time_to_first_token_ms}ms"This method:
- Forces streaming even if
streaming falseis set at class level - Requires a block (raises
ArgumentErrorif none provided) - Returns a
Resultobject with full metadata
When streaming completes, the returned Result contains streaming-specific metadata:
result = LLM::StreamingAgent.call(prompt: "test") do |chunk|
print chunk.content
end
result.streaming? # => true
result.time_to_first_token_ms # => 245 (ms until first chunk arrived)
result.duration_ms # => 2500 (total execution time)class StreamingController < ApplicationController
include ActionController::Live
def stream_response
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering
LLM::StreamingAgent.call(prompt: params[:prompt]) do |chunk|
response.stream.write "data: #{chunk.to_json}\n\n"
end
response.stream.write "data: [DONE]\n\n"
rescue ActionController::Live::ClientDisconnected
# Client disconnected, clean up
ensure
response.stream.close
end
endconst eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt));
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const chunk = JSON.parse(event.data);
document.getElementById('output').textContent += chunk;
};
eventSource.onerror = () => {
eventSource.close();
};class ChatController < ApplicationController
def create
respond_to do |format|
format.turbo_stream do
LLM::StreamingAgent.call(prompt: params[:message]) do |chunk|
Turbo::StreamsChannel.broadcast_append_to(
"chat_#{params[:chat_id]}",
target: "messages",
partial: "messages/chunk",
locals: { content: chunk }
)
end
end
end
end
end<%= turbo_stream_from "chat_#{@chat.id}" %>
<div id="messages"></div>Streaming executions track latency metrics:
# After streaming completes
execution = RubyLLM::Agents::Execution.last
execution.streaming? # => true
execution.time_to_first_token_ms # => 245 (ms until first chunk)
execution.duration_ms # => 2500 (total time)# Average TTFT for streaming agents
RubyLLM::Agents::Execution
.streaming
.average(:time_to_first_token_ms)
# => 312.5
# TTFT by model
RubyLLM::Agents::Execution
.streaming
.group(:model_id)
.average(:time_to_first_token_ms)
# => { "gpt-4o" => 280, "claude-3-sonnet" => 195 }When using schemas, the full response is still validated:
module LLM
class StructuredStreamingAgent < ApplicationAgent
model "gpt-4o"
streaming true
param :topic, required: true
def user_prompt
"Write about #{topic}"
end
def schema
@schema ||= RubyLLM::Schema.create do
string :title
string :content
end
end
end
end
# Stream the raw text
LLM::StructuredStreamingAgent.call(topic: "AI") do |chunk|
print chunk # Raw JSON chunks
end
# Result is parsed and validated at the endImportant: Streaming responses are not cached by design, as caching would defeat the purpose of real-time streaming.
module LLM
class MyAgent < ApplicationAgent
streaming true
cache_for 1.hour # Cache is ignored when streaming
end
endIf you need caching with streaming-like UX, consider:
- Cache the full response
- Simulate streaming on the client side
begin
LLM::StreamingAgent.call(prompt: "test") do |chunk|
print chunk
end
rescue Timeout::Error
puts "\n[Stream timed out]"
rescue => e
puts "\n[Stream error: #{e.message}]"
endFor long-running streams, use ActionCable:
class StreamingJob < ApplicationJob
def perform(prompt, channel_id)
LLM::StreamingAgent.call(prompt: prompt) do |chunk|
ActionCable.server.broadcast(
channel_id,
{ type: 'chunk', content: chunk }
)
end
ActionCable.server.broadcast(
channel_id,
{ type: 'complete' }
)
end
endStreaming is most beneficial for:
- Long-form content generation
- Conversational interfaces
- Real-time transcription/translation
def stream_response
LLM::StreamingAgent.call(prompt: params[:prompt]) do |chunk|
break if response.stream.closed?
response.stream.write "data: #{chunk.to_json}\n\n"
end
ensure
response.stream.close
endmodule LLM
class LongFormAgent < ApplicationAgent
streaming true
timeout 180 # 3 minutes for long content
end
endTrack time-to-first-token to ensure good UX:
# Alert if TTFT is too high
if execution.time_to_first_token_ms > 1000
Rails.logger.warn("High TTFT: #{execution.time_to_first_token_ms}ms")
end- Agent DSL - Configuration options
- Execution Tracking - TTFT analytics
- Dashboard - Monitoring streaming metrics