Streaming Responses¶
Streaming enables real-time delivery of agent responses via Server-Sent Events (SSE). This provides a better user experience for long-running operations.
Why Streaming?¶
- Immediate Feedback: Users see responses as they're generated
- Progress Visibility: Track long operations in real-time
- Resource Efficiency: Process data incrementally
- Better UX: No waiting for complete responses
Basic Streaming¶
Return an Enumerator to enable streaming:
server.agent("streamer") do |context|
Enumerator.new do |yielder|
5.times do |i|
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Message #{i + 1}")
)
sleep 0.5
end
end
end
RunYield¶
RunYield wraps messages for streaming:
# Single message
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Hello")
)
# Multiple messages
yielder << SimpleAcp::Server::RunYield.new(
[
SimpleAcp::Models::Message.agent("First"),
SimpleAcp::Models::Message.agent("Second")
]
)
Streaming Patterns¶
Token-by-Token¶
Stream individual tokens for typing effect:
server.agent("typer") do |context|
text = context.input.first&.text_content || "Hello World"
Enumerator.new do |yielder|
text.chars.each do |char|
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(char)
)
sleep 0.05
end
end
end
Word-by-Word¶
server.agent("word-stream") do |context|
text = context.input.first&.text_content || ""
Enumerator.new do |yielder|
text.split.each do |word|
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(word + " ")
)
sleep 0.1
end
end
end
Progress Updates¶
server.agent("processor") do |context|
items = parse_items(context.input)
total = items.length
Enumerator.new do |yielder|
items.each_with_index do |item, i|
# Process item
result = process(item)
# Report progress
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(
"Processing #{i + 1}/#{total}: #{result}"
)
)
end
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Complete!")
)
end
end
LLM Integration¶
Stream responses from language models:
server.agent("chat") do |context|
messages = context.history + context.input
Enumerator.new do |yielder|
llm.stream_chat(messages) do |chunk|
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(chunk)
)
end
end
end
Mixed Content¶
Stream different content types:
server.agent("analyzer") do |context|
data = parse_data(context.input)
Enumerator.new do |yielder|
# Text update
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Analyzing data...")
)
result = analyze(data)
# JSON result
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(
SimpleAcp::Models::MessagePart.json(result)
)
)
# Summary text
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Analysis complete!")
)
end
end
Event Types¶
Streaming produces these events:
sequenceDiagram
participant Client
participant Server
Server-->>Client: RunStartedEvent
Server-->>Client: MessageCreatedEvent
Server-->>Client: MessagePartEvent
Server-->>Client: MessageCompletedEvent
Note over Server,Client: Repeat for each message
Server-->>Client: RunCompletedEvent
HTTP Streaming¶
The server uses SSE for HTTP streaming:
POST /runs HTTP/1.1
Accept: text/event-stream
Content-Type: application/json
{"agent_name": "chat", "input": [...]}
HTTP/1.1 200 OK
Content-Type: text/event-stream
event: run_started
data: {"run_id":"abc123"}
event: message_created
data: {"message":{"role":"agent","parts":[]}}
event: message_part
data: {"part":{"content_type":"text/plain","content":"Hello"}}
event: message_completed
data: {"message":{"role":"agent","parts":[...]}}
event: run_completed
data: {"run":{...}}
Server-Side Streaming¶
Execute streaming runs programmatically:
server.run_stream(
agent_name: "chat",
input: messages
) do |event|
case event
when SimpleAcp::Models::MessagePartEvent
print event.part.content
when SimpleAcp::Models::RunCompletedEvent
puts "\nDone!"
end
end
Error Handling¶
Handle errors during streaming:
server.agent("safe-streamer") do |context|
Enumerator.new do |yielder|
begin
process_stream(context.input) do |chunk|
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(chunk)
)
end
rescue StreamError => e
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent("Stream error: #{e.message}")
)
end
end
end
Buffering¶
For high-frequency updates, consider buffering:
server.agent("buffered") do |context|
Enumerator.new do |yielder|
buffer = []
process_items(context.input) do |item|
buffer << item
# Flush every 10 items
if buffer.length >= 10
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(buffer.join)
)
buffer.clear
end
end
# Flush remaining
unless buffer.empty?
yielder << SimpleAcp::Server::RunYield.new(
SimpleAcp::Models::Message.agent(buffer.join)
)
end
end
end
Best Practices¶
- Yield frequently - Don't wait too long between yields
- Handle errors - Catch and report errors gracefully
- Use appropriate chunking - Balance frequency with overhead
- Test streaming - Verify event order and content
- Monitor memory - Stream large responses to avoid memory issues
Testing Streaming¶
def test_streaming_agent
events = []
server.run_stream(
agent_name: "streamer",
input: [SimpleAcp::Models::Message.user("test")]
) do |event|
events << event
end
# Verify event sequence
assert events.first.is_a?(SimpleAcp::Models::RunStartedEvent)
assert events.last.is_a?(SimpleAcp::Models::RunCompletedEvent)
# Check message events
message_events = events.select { |e| e.is_a?(SimpleAcp::Models::MessagePartEvent) }
assert message_events.length > 0
end
Next Steps¶
- Learn about Multi-Turn Conversations
- See Client Streaming for consuming streams
- Review Events for event details