Server API¶
Server::Base¶
The server entry point. Creates and wires all server components, then runs Falcon.
server = A2A::Server::Base.new(
agent_card: card, # A2A::Models::AgentCard (required)
executor: MyExecutor.new, # A2A::Server::AgentExecutor subclass (required)
storage: A2A::Storage::Memory.new, # default
push_sender: nil, # A2A::Server::PushSender instance, optional
host: "localhost", # default
port: 9292 # default
)
server.run # blocks — starts Falcon
server.rack_app # returns the Rack app (useful for embedding in other servers)
Convenience factory:
Server::MultiAgent¶
Hosts multiple A2A agents in one Falcon process by mounting each agent at its own URL path. Use this when you want independent AgentCards, executors, storage, and SSE channels behind one port.
server = A2A.multi_server(
agents: {
"/anthropic" => { agent_card: anthropic_card, executor: AnthropicExecutor.new },
"/openai" => { agent_card: openai_card, executor: OpenAIExecutor.new },
"/evaluator" => { agent_card: evaluator_card, executor: EvaluatorExecutor.new }
},
host: "localhost",
port: 9292
)
server.run
Each entry in agents accepts the same core configuration used by Server::Base:
| Key | Required | Description |
|---|---|---|
:agent_card |
Yes | AgentCard returned by that path's /agentCard endpoint |
:executor |
Yes | Executor that handles requests for that path |
:storage |
No | Storage backend for that path; defaults to A2A::Storage::Memory.new |
:event_router |
No | SSE event router for that path; defaults to a new router |
:push_sender |
No | Push notification sender for that path |
For a runnable example, see the Multi-Agent LLM Research demo.
Server::AgentExecutor¶
Base class for your agent logic. Subclass and implement #call:
class MyExecutor < A2A::Server::AgentExecutor
def call(ctx)
# ctx is an A2A::Server::Context
input = ctx.message.text_content
ctx.task.start!
# … do work …
ctx.task.complete!(artifacts: [ … ])
end
# Optional: handle task cancellation
def cancel(ctx)
# default implementation calls ctx.task.cancel! and emits a final status event
super
end
end
#call runs synchronously inside the Falcon reactor. Long-running work should use Async::Task internally to stay non-blocking.
Server::Context¶
Passed to AgentExecutor#call. Provides access to the request and helper methods.
ctx.task # => A2A::Models::Task
ctx.message # => A2A::Models::Message (the incoming message)
ctx.storage # => A2A::Storage::Base
ctx.event_router # => A2A::Server::EventRouter
ctx.config # => Hash (arbitrary per-request config, default {})
ctx.save_task # persists task to storage
ctx.emit_status(final: false) # publishes TaskStatusUpdateEvent
ctx.emit_artifact(artifact, append: false, last_chunk: false) # publishes TaskArtifactUpdateEvent
Server::ResumeContext¶
A Context subclass for resumed tasks (after input_required or auth_required).
Server::EventRouter¶
Manages per-task SSE channels using TypedBus. You rarely interact with this directly — use ctx.emit_status and ctx.emit_artifact instead.
router = A2A::Server::EventRouter.new
router.open(task_id) # creates a channel
router.publish(task_id, event) # sends an event to subscribers
router.subscribe(task_id) { |event| … } # block receives raw event objects
router.close(task_id) # removes the channel
router.channel?(task_id) # => true/false
Server::PushSender¶
Delivers webhook push notifications.
sender = A2A::Server::PushSender.new(
private_key: OpenSSL::PKey::RSA.generate(2048), # for JWT signing
key_id: "my-key-id",
issuer: "my-agent"
)
sender.deliver(push_config, event) # => true (success) or false (failure)
Schemes:
"bearer"— signs a JWT withRS256and sendsAuthorization: Bearer <token>"token"— sends the static value asAuthorization: Token <value>(or custom header)
Server::App¶
The Roda-based Rack application. You don't instantiate this directly — Server::Base configures and freezes it.
Routes:
| Method | Path | Description |
|---|---|---|
GET |
/agentCard |
Returns the AgentCard as JSON |
POST |
/ |
JSON-RPC 2.0 dispatch |