Skip to content

Streaming

Real-time event streaming during robot and network execution.

Overview

The streaming system provides structured event publishing during LLM execution. Events are emitted for run lifecycle, content deltas (token streaming), tool calls, and metadata updates. The system supports nested contexts for network-level orchestration where multiple robots execute within a single run.

publish = ->(event) {
  case event[:event]
  when "text.delta"
    print event[:data][:delta]
  when "run.completed"
    puts "\nDone!"
  end
}

context = RobotLab::Streaming::Context.new(
  run_id: SecureRandom.uuid,
  message_id: SecureRandom.uuid,
  scope: "robot",
  publish: publish
)

context.publish_event(event: "text.delta", data: { delta: "Hello" })

Components

Component Description
Context Manages streaming state, sequencing, and event publishing
Events Event type constants and classification helpers

Also used internally:

Component Description
SequenceCounter Thread-safe monotonic counter for event ordering

Event Categories

Category Events Description
Lifecycle run.started, run.completed, run.failed, run.interrupted Run-level state changes
Steps step.started, step.completed, step.failed Durable execution steps
Parts part.created, part.completed, part.failed Message composition parts
Deltas text.delta, tool_call.arguments.delta, reasoning.delta, data.delta Token-level content streaming
HITL hitl.requested, hitl.resolved Human-in-the-loop events
Metadata usage.updated, metadata.updated Token usage and metadata
Terminal stream.ended End of stream signal

Event Structure

Each published event is a hash with the following shape:

{
  event: "text.delta",           # Event type string
  data: {                        # Event payload (merged with context info)
    delta: "Hello",              #   Custom data
    run_id: "run_123",           #   Injected by context
    message_id: "msg_456",       #   Injected by context
    scope: "robot"               #   Injected by context
  },
  timestamp: 1707900000000,      # Millisecond Unix timestamp
  sequence_number: 1,            # Monotonically increasing
  id: "publish-1:text.delta"     # Unique event ID
}

Quick Start

Publishing Events

context = RobotLab::Streaming::Context.new(
  run_id: "run_123",
  message_id: "msg_456",
  scope: "network",
  publish: ->(event) { broadcast(event) }
)

context.publish_event(event: "run.started", data: { robot_name: "assistant" })
context.publish_event(event: "text.delta", data: { delta: "Hello " })
context.publish_event(event: "text.delta", data: { delta: "world!" })
context.publish_event(event: "run.completed", data: {})

Nested Contexts for Networks

# Parent context for the network run
network_context = RobotLab::Streaming::Context.new(
  run_id: "network_run_1",
  message_id: "msg_1",
  scope: "network",
  publish: ->(event) { stream_to_client(event) }
)

# Child context for each robot (shares the sequence counter)
robot_context = network_context.create_child_context("robot_run_1")
robot_context.publish_event(event: "text.delta", data: { delta: "Response" })

See Also