Streaming::Context¶
Manages streaming event publishing with automatic sequencing, timestamping, and ID generation.
Class: RobotLab::Streaming::Context¶
context = RobotLab::Streaming::Context.new(
run_id: "run_123",
message_id: "msg_456",
scope: "network",
publish: ->(event) { broadcast(event) }
)
context.publish_event(event: "text.delta", data: { delta: "Hello" })
Constructor¶
Parameters:
| Name | Type | Default | Description |
|---|---|---|---|
run_id |
String |
required | Unique identifier for this run |
message_id |
String |
required | Current message identifier |
scope |
String, Symbol |
required | Context scope (e.g., "network", "robot") |
publish |
Proc |
required | Callback invoked with each event hash |
parent_run_id |
String, nil |
nil |
Parent run identifier for nested contexts |
sequence_counter |
SequenceCounter, nil |
nil |
Shared sequence counter (creates new one if nil) |
Attributes¶
run_id¶
The unique run identifier for this context.
parent_run_id¶
The parent run identifier. Set when this is a child context created by create_child_context.
message_id¶
The current message identifier.
scope¶
The context scope (converted to string). Typically "network" or "robot".
Methods¶
publish_event¶
Publish a streaming event. The event is wrapped in a chunk with automatic sequencing, timestamping, and context metadata injection.
Parameters:
| Name | Type | Description |
|---|---|---|
event |
String |
Event type (e.g., "text.delta", "run.started") |
data |
Hash |
Event payload (default: {}) |
Returns: The constructed event chunk hash.
The chunk structure:
{
event: "text.delta",
data: {
delta: "Hello", # from data parameter
run_id: "run_123", # injected from context
message_id: "msg_456", # injected from context
scope: "robot" # injected from context
},
timestamp: 1707900000000, # millisecond Unix timestamp
sequence_number: 1, # monotonically increasing
id: "publish-1:text.delta" # unique event ID
}
If the publish callback raises an error, it is caught and logged as a warning via RobotLab.config.logger. The chunk is still returned.
create_child_context¶
Create a child context for a nested robot execution. The child shares the same publish callback and sequence counter, ensuring events are ordered globally across the parent and child.
Parameters:
| Name | Type | Description |
|---|---|---|
robot_run_id |
String |
Run ID for the child context |
Returns: A new Context with:
- run_id set to robot_run_id
- parent_run_id set to the current context's run_id
- scope set to "robot"
- A new message_id (generated UUID)
- Shared sequence_counter and publish callback
create_context_with_shared_sequence¶
sibling = context.create_context_with_shared_sequence(
run_id: "run_789",
message_id: "msg_789",
scope: "robot"
)
Create a new context that shares the same sequence counter as this context, but with different identifiers.
Parameters:
| Name | Type | Description |
|---|---|---|
run_id |
String |
Run ID for the new context |
message_id |
String |
Message ID for the new context |
scope |
String |
Scope for the new context |
Returns: A new Context sharing the sequence counter and publish callback.
generate_part_id¶
Generate an OpenAI-compatible part ID (max 40 characters). Combines a truncated message ID, a timestamp suffix, and random hex.
generate_step_id¶
Generate a step ID for durable execution compatibility. Uses the current sequence number.
Parameters:
| Name | Type | Description |
|---|---|---|
base_name |
String |
Base name for the step |
generate_message_id¶
Generate a new UUID for use as a message identifier.
Examples¶
Basic Event Publishing¶
publish = ->(event) {
puts "[#{event[:event]}] #{event[:data]}"
}
context = RobotLab::Streaming::Context.new(
run_id: SecureRandom.uuid,
message_id: SecureRandom.uuid,
scope: "robot",
publish: publish
)
context.publish_event(event: "run.started", data: { robot_name: "assistant" })
context.publish_event(event: "text.delta", data: { delta: "Hello " })
context.publish_event(event: "text.delta", data: { delta: "world!" })
context.publish_event(event: "run.completed", data: {})
Network with Child Contexts¶
# Network-level context
network_ctx = RobotLab::Streaming::Context.new(
run_id: "net_run_1",
message_id: "net_msg_1",
scope: "network",
publish: ->(e) { stream_to_client(e) }
)
network_ctx.publish_event(event: "run.started", data: {})
# Robot 1 executes
robot1_ctx = network_ctx.create_child_context("robot1_run_1")
robot1_ctx.publish_event(event: "step.started", data: { robot: "classifier" })
robot1_ctx.publish_event(event: "text.delta", data: { delta: "Category: billing" })
robot1_ctx.publish_event(event: "step.completed", data: { robot: "classifier" })
# Robot 2 executes (sequence numbers continue from robot 1)
robot2_ctx = network_ctx.create_child_context("robot2_run_1")
robot2_ctx.publish_event(event: "step.started", data: { robot: "responder" })
robot2_ctx.publish_event(event: "text.delta", data: { delta: "I can help with that." })
robot2_ctx.publish_event(event: "step.completed", data: { robot: "responder" })
network_ctx.publish_event(event: "run.completed", data: {})
Error-Safe Publishing¶
# Errors in the publish callback are caught and logged,
# so streaming failures do not interrupt execution.
context = RobotLab::Streaming::Context.new(
run_id: "run_1",
message_id: "msg_1",
scope: "robot",
publish: ->(e) { raise "connection lost" }
)
# This does not raise -- the error is logged via RobotLab.config.logger
chunk = context.publish_event(event: "text.delta", data: { delta: "test" })
# chunk is still returned with the event data