This document provides a comprehensive reference for the A2A Ruby SDK API.
The primary client for communicating with A2A agents over HTTP.
client = A2A::Client::HttpClient.new(endpoint_url, **options)Parameters:
endpoint_url(String) - The A2A agent endpoint URLoptions(Hash) - Optional configuration:auth(A2A::Client::Auth::Base) - Authentication strategy:config(A2A::Client::Config) - Client configuration:middleware(Array) - Middleware stack:consumers(Array) - Event consumers
Example:
auth = A2A::Client::Auth::OAuth2.new(
client_id: "your-id",
client_secret: "your-secret",
token_url: "https://auth.example.com/token"
)
config = A2A::Client::Config.new
config.timeout = 60
config.streaming = true
client = A2A::Client::HttpClient.new(
"https://agent.example.com/a2a",
auth: auth,
config: config
)Sends a message to the agent and handles responses.
Parameters:
message(A2A::Types::Message) - The message to sendcontext(Hash, optional) - Request context&block- Block to handle streaming responses
Returns:
- Single response (if not streaming)
- Enumerator (if streaming without block)
- Yields responses to block (if streaming with block)
Example:
message = A2A::Types::Message.new(
message_id: SecureRandom.uuid,
role: "user",
parts: [A2A::Types::TextPart.new(text: "Hello!")]
)
# Blocking call
response = client.send_message(message, streaming: false)
# Streaming with block
client.send_message(message) do |response|
case response
when A2A::Types::Message
puts "Agent: #{response.parts.first.text}"
when A2A::Types::TaskStatusUpdateEvent
puts "Status: #{response.status.state}"
end
end
# Streaming with enumerator
responses = client.send_message(message)
responses.each { |response| process(response) }Retrieves a task by ID.
Parameters:
task_id(String) - The task IDcontext(Hash, optional) - Request contexthistory_length(Integer, optional) - Limit message history
Returns:
A2A::Types::Task- The task object
Example:
task = client.get_task("task-123", history_length: 10)
puts "Task state: #{task.status.state}"
puts "Progress: #{task.status.progress}%" if task.status.progressCancels a task.
Parameters:
task_id(String) - The task IDcontext(Hash, optional) - Request context
Returns:
A2A::Types::Task- The updated task
Example:
begin
task = client.cancel_task("task-123")
puts "Task canceled: #{task.status.state}"
rescue A2A::Errors::TaskNotCancelable => e
puts "Cannot cancel task: #{e.message}"
endResubscribes to task updates for streaming.
Parameters:
task_id(String) - The task IDcontext(Hash, optional) - Request context&block- Block to handle events
Example:
client.resubscribe("task-123") do |event|
case event
when A2A::Types::TaskStatusUpdateEvent
puts "Status update: #{event.status.state}"
when A2A::Types::TaskArtifactUpdateEvent
puts "New artifact: #{event.artifact.name}"
end
endRetrieves the agent card.
Parameters:
context(Hash, optional) - Request context
Returns:
A2A::Types::AgentCard- The agent card
Example:
card = client.get_card
puts "Agent: #{card.name} v#{card.version}"
puts "Skills: #{card.skills.map(&:name).join(', ')}"Sets up push notifications for a task.
Parameters:
task_id(String) - The task IDpush_config(A2A::Types::PushNotificationConfig) - Push notification configurationcontext(Hash, optional) - Request context
Example:
push_config = A2A::Types::PushNotificationConfig.new(
url: "https://your-app.com/webhooks/a2a",
authentication: {
type: "bearer",
token: "webhook-secret"
}
)
client.set_task_callback("task-123", push_config)Client configuration object.
config = A2A::Client::Config.new
# Streaming configuration
config.streaming = true # Enable streaming responses
config.polling = false # Enable polling fallback
# Transport configuration
config.supported_transports = ['JSONRPC'] # Supported transport protocols
config.use_client_preference = true # Use client transport preference
# Timeout configuration
config.timeout = 30 # Request timeout in seconds
config.connect_timeout = 10 # Connection timeout in seconds
# Output configuration
config.accepted_output_modes = ['text', 'structured'] # Accepted output modes
# Push notification configuration
config.push_notification_configs = [] # Default push notification configsMixin for creating A2A agents.
class MyAgent
include A2A::Server::Agent
# Agent configuration
a2a_config(
name: "My Agent",
description: "A sample A2A agent",
version: "1.0.0"
)
# Define skills
a2a_skill "greeting" do |skill|
skill.description = "Greet users"
skill.tags = ["greeting", "conversation"]
skill.examples = ["Hello", "Say hi"]
skill.input_modes = ["text"]
skill.output_modes = ["text"]
end
# Define methods
a2a_method "greet" do |params|
name = params[:name] || "there"
{ message: "Hello, #{name}!" }
end
endConfigures the agent metadata.
Parameters:
name(String) - Agent namedescription(String) - Agent descriptionversion(String) - Agent versionurl(String, optional) - Agent URLprovider(Hash, optional) - Provider information
Defines an agent skill.
Parameters:
name(String) - Skill name&block- Configuration block
Block methods:
description(String) - Skill descriptiontags(Array) - Skill tagsexamples(Array) - Usage examplesinput_modes(Array) - Supported input modesoutput_modes(Array) - Supported output modessecurity(Hash, optional) - Security requirements
Defines an A2A method.
Parameters:
name(String) - Method nameoptions(Hash) - Method options:streaming(Boolean) - Whether method supports streaming:auth_required(Boolean) - Whether authentication is required
&block- Method implementation
Example:
# Simple method
a2a_method "get_weather" do |params|
location = params[:location]
WeatherService.current(location)
end
# Streaming method
a2a_method "weather_forecast", streaming: true do |params|
Enumerator.new do |yielder|
# Yield status updates
yielder << task_status_update("working")
# Yield data
forecast_data.each do |day|
message = A2A::Types::Message.new(
message_id: SecureRandom.uuid,
role: "agent",
parts: [A2A::Types::TextPart.new(text: day.to_json)]
)
yielder << message
end
# Final status
yielder << task_status_update("completed")
end
end
# Method with authentication
a2a_method "secure_operation", auth_required: true do |params|
# Access current user via @current_user (set by auth middleware)
return { error: "Unauthorized" } unless @current_user
perform_secure_operation(params, @current_user)
endHandles an A2A JSON-RPC request.
Parameters:
request(A2A::Protocol::JsonRpc::Request) - The JSON-RPC request
Returns:
- Hash - JSON-RPC response
Generates an agent card from the agent definition.
Parameters:
overrides(Hash) - Override default values
Returns:
A2A::Types::AgentCard- The agent card
Manages task lifecycle and persistence.
task_manager = A2A::Server::TaskManager.new(storage: storage_backend)Creates a new task.
Parameters:
type(String) - Task typeparams(Hash) - Task parametersoptions(Hash) - Additional options
Returns:
A2A::Types::Task- The created task
Updates task status.
Parameters:
task_id(String) - Task IDstatus(A2A::Types::TaskStatus) - New status
Returns:
A2A::Types::Task- Updated task
Retrieves a task.
Parameters:
task_id(String) - Task ID
Returns:
A2A::Types::Task- The task
Raises:
A2A::Errors::TaskNotFound- If task doesn't exist
Represents an A2A message.
message = A2A::Types::Message.new(
message_id: SecureRandom.uuid,
role: "user", # "user" or "agent"
parts: [part1, part2], # Array of Part objects
context_id: "context-123", # Optional
task_id: "task-123", # Optional
metadata: { key: "value" }, # Optional
extensions: [], # Optional
reference_task_ids: [] # Optional
)message_id(String) - Unique message identifierrole(String) - Message role ("user" or "agent")parts(ArrayA2A::Types::Part) - Message partscontext_id(String, optional) - Context identifiertask_id(String, optional) - Associated task IDkind(String) - Always "message"metadata(Hash, optional) - Additional metadataextensions(Array, optional) - Protocol extensionsreference_task_ids(Array, optional) - Referenced task IDs
Base class for message parts. Use specific subclasses:
text_part = A2A::Types::TextPart.new(
text: "Hello, world!",
metadata: { language: "en" } # Optional
)# File with bytes (base64 encoded)
file_part = A2A::Types::FilePart.new(
file: A2A::Types::FileWithBytes.new(
name: "document.pdf",
mime_type: "application/pdf",
bytes: Base64.encode64(file_content)
)
)
# File with URI reference
file_part = A2A::Types::FilePart.new(
file: A2A::Types::FileWithUri.new(
name: "document.pdf",
mime_type: "application/pdf",
uri: "https://storage.example.com/document.pdf"
)
)data_part = A2A::Types::DataPart.new(
data: { key: "value", numbers: [1, 2, 3] },
metadata: { format: "json" }
)Represents a task with lifecycle management.
task = A2A::Types::Task.new(
id: SecureRandom.uuid,
context_id: SecureRandom.uuid,
status: A2A::Types::TaskStatus.new(state: "submitted"),
artifacts: [], # Optional
history: [], # Optional
metadata: {} # Optional
)id(String) - Unique task identifiercontext_id(String) - Context identifierstatus(A2A::Types::TaskStatus) - Current task statuskind(String) - Always "task"artifacts(ArrayA2A::Types::Artifact, optional) - Task artifactshistory(ArrayA2A::Types::Message, optional) - Message historymetadata(Hash, optional) - Additional metadata
Represents task status and progress.
status = A2A::Types::TaskStatus.new(
state: "working", # Required
message: "Processing data...", # Optional
progress: 75, # Optional (0-100)
result: { processed: 1000 }, # Optional
error: { message: "Error occurred" }, # Optional
updated_at: Time.current.iso8601 # Optional
)submitted- Task created, waiting to startworking- Task in progressinput-required- Task needs user inputcompleted- Task finished successfullycanceled- Task was canceledfailed- Task failed with errorrejected- Task rejected (invalid params, etc.)auth-required- Task needs authenticationunknown- Unknown state
Represents agent capabilities and metadata.
card = A2A::Types::AgentCard.new(
name: "Weather Agent",
description: "Provides weather information",
version: "1.0.0",
url: "https://agent.example.com/a2a",
preferred_transport: "JSONRPC",
skills: [skill1, skill2],
capabilities: capabilities,
default_input_modes: ["text"],
default_output_modes: ["text", "structured"],
additional_interfaces: [interface1], # Optional
security: security_config, # Optional
security_schemes: [scheme1], # Optional
provider: provider_info, # Optional
protocol_version: "0.3.0", # Optional
supports_authenticated_extended_card: true, # Optional
signatures: [signature1], # Optional
documentation_url: "https://docs.example.com", # Optional
icon_url: "https://example.com/icon.png" # Optional
)Global configuration method.
A2A.configure do |config|
# Protocol configuration
config.protocol_version = "0.3.0"
config.default_transport = "JSONRPC"
# Feature flags
config.streaming_enabled = true
config.push_notifications_enabled = true
# Timeouts
config.default_timeout = 30
config.connect_timeout = 10
# Logging
config.log_level = :info
config.log_requests = false
config.log_responses = false
# Storage
config.storage_backend = :database # :memory, :redis, :database
config.database_url = ENV['DATABASE_URL']
config.redis_url = ENV['REDIS_URL']
# Security
config.force_ssl = true
config.ssl_verify = true
# Performance
config.enable_metrics = true
config.metrics_backend = :prometheus # :prometheus, :statsd
# Rate limiting
config.rate_limit_enabled = true
config.rate_limit_requests = 100
config.rate_limit_window = 60
endHTTP transport implementation using Faraday.
transport = A2A::Transport::Http.new(
endpoint_url: "https://agent.example.com/a2a",
timeout: 30,
ssl_verify: true
)
response = transport.send_request(json_rpc_request)Server-Sent Events transport for streaming.
sse = A2A::Transport::SSE.new(endpoint_url)
sse.stream(request) do |event|
case event.type
when 'message'
handle_message(JSON.parse(event.data))
when 'error'
handle_error(JSON.parse(event.data))
end
endOAuth 2.0 client credentials flow.
auth = A2A::Client::Auth::OAuth2.new(
client_id: "your-client-id",
client_secret: "your-client-secret",
token_url: "https://auth.example.com/oauth/token",
scope: "a2a:read a2a:write" # Optional
)
# Manual token acquisition
token, expires_at = auth.get_token
# Automatic application to requests
client = A2A::Client::HttpClient.new(url, auth: auth)JWT bearer token authentication.
auth = A2A::Client::Auth::JWT.new(
token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
header: "Authorization" # Optional, defaults to "Authorization"
)API key authentication.
# Header-based API key
auth = A2A::Client::Auth::ApiKey.new(
key: "your-api-key",
header: "X-API-Key"
)
# Query parameter API key
auth = A2A::Client::Auth::ApiKey.new(
key: "your-api-key",
parameter: "api_key"
)class CustomAuth < A2A::Client::Auth::Base
def initialize(credentials)
@credentials = credentials
end
def apply_auth(request)
# Add custom authentication to request
request.headers['X-Custom-Auth'] = generate_signature(@credentials)
end
private
def generate_signature(credentials)
# Custom signature logic
end
endA2A::Errors::A2AError
├── A2A::Errors::ParseError (-32700)
├── A2A::Errors::InvalidRequest (-32600)
├── A2A::Errors::MethodNotFound (-32601)
├── A2A::Errors::InvalidParams (-32602)
├── A2A::Errors::InternalError (-32603)
├── A2A::Errors::TaskNotFound (-32001)
├── A2A::Errors::TaskNotCancelable (-32002)
├── A2A::Errors::InvalidTaskState (-32003)
├── A2A::Errors::AuthenticationRequired (-32004)
├── A2A::Errors::InsufficientPermissions (-32005)
├── A2A::Errors::RateLimitExceeded (-32006)
├── A2A::Errors::InvalidAgentCard (-32007)
├── A2A::Errors::TransportNotSupported (-32008)
├── A2A::Errors::InvalidMessageFormat (-32009)
├── A2A::Errors::ServiceUnavailable (-32010)
├── A2A::Errors::ClientError
│ ├── A2A::Errors::HTTPError
│ ├── A2A::Errors::TimeoutError
│ └── A2A::Errors::AuthenticationError
└── A2A::Errors::ServerError
begin
response = client.send_message(message)
rescue A2A::Errors::AuthenticationError => e
# Handle auth errors
refresh_credentials
retry
rescue A2A::Errors::TaskNotFound => e
# Handle missing task
logger.warn "Task not found: #{e.message}"
rescue A2A::Errors::RateLimitExceeded => e
# Handle rate limiting
sleep(e.retry_after || 60)
retry
rescue A2A::Errors::A2AError => e
# Handle all A2A protocol errors
logger.error "A2A Error #{e.code}: #{e.message}"
handle_protocol_error(e)
rescue StandardError => e
# Handle unexpected errors
logger.error "Unexpected error: #{e.message}"
raise
endA2A errors include structured information:
begin
client.send_message(message)
rescue A2A::Errors::A2AError => e
puts "Error Code: #{e.code}"
puts "Message: #{e.message}"
puts "Data: #{e.data}" if e.data
# Convert to JSON-RPC error format
json_rpc_error = e.to_json_rpc_error
# => { code: -32001, message: "Task not found", data: {...} }
endFor complete API documentation with examples, see the YARD documentation.