Skip to content

Streaming::Context

Manages streaming event publishing with automatic sequencing, timestamping, and ID generation.

Class: RobotLab::Streaming::Context

context = RobotLab::Streaming::Context.new(
  run_id: "run_123",
  message_id: "msg_456",
  scope: "network",
  publish: ->(event) { broadcast(event) }
)

context.publish_event(event: "text.delta", data: { delta: "Hello" })

Constructor

Context.new(
  run_id:,
  message_id:,
  scope:,
  publish:,
  parent_run_id: nil,
  sequence_counter: nil
)

Parameters:

Name Type Default Description
run_id String required Unique identifier for this run
message_id String required Current message identifier
scope String, Symbol required Context scope (e.g., "network", "robot")
publish Proc required Callback invoked with each event hash
parent_run_id String, nil nil Parent run identifier for nested contexts
sequence_counter SequenceCounter, nil nil Shared sequence counter (creates new one if nil)

Attributes

run_id

context.run_id  # => String

The unique run identifier for this context.

parent_run_id

context.parent_run_id  # => String | nil

The parent run identifier. Set when this is a child context created by create_child_context.

message_id

context.message_id  # => String

The current message identifier.

scope

context.scope  # => String

The context scope (converted to string). Typically "network" or "robot".

Methods

publish_event

chunk = context.publish_event(event:, data: {})

Publish a streaming event. The event is wrapped in a chunk with automatic sequencing, timestamping, and context metadata injection.

Parameters:

Name Type Description
event String Event type (e.g., "text.delta", "run.started")
data Hash Event payload (default: {})

Returns: The constructed event chunk hash.

The chunk structure:

{
  event: "text.delta",
  data: {
    delta: "Hello",          # from data parameter
    run_id: "run_123",       # injected from context
    message_id: "msg_456",   # injected from context
    scope: "robot"           # injected from context
  },
  timestamp: 1707900000000,  # millisecond Unix timestamp
  sequence_number: 1,        # monotonically increasing
  id: "publish-1:text.delta" # unique event ID
}

If the publish callback raises an error, it is caught and logged as a warning via RobotLab.config.logger. The chunk is still returned.

create_child_context

child = context.create_child_context(robot_run_id)

Create a child context for a nested robot execution. The child shares the same publish callback and sequence counter, ensuring events are ordered globally across the parent and child.

Parameters:

Name Type Description
robot_run_id String Run ID for the child context

Returns: A new Context with: - run_id set to robot_run_id - parent_run_id set to the current context's run_id - scope set to "robot" - A new message_id (generated UUID) - Shared sequence_counter and publish callback

create_context_with_shared_sequence

sibling = context.create_context_with_shared_sequence(
  run_id: "run_789",
  message_id: "msg_789",
  scope: "robot"
)

Create a new context that shares the same sequence counter as this context, but with different identifiers.

Parameters:

Name Type Description
run_id String Run ID for the new context
message_id String Message ID for the new context
scope String Scope for the new context

Returns: A new Context sharing the sequence counter and publish callback.

generate_part_id

context.generate_part_id  # => "part_run_1234_900123_a1b2c3d4"

Generate an OpenAI-compatible part ID (max 40 characters). Combines a truncated message ID, a timestamp suffix, and random hex.

generate_step_id

context.generate_step_id("text_output")  # => "publish-3:text_output"

Generate a step ID for durable execution compatibility. Uses the current sequence number.

Parameters:

Name Type Description
base_name String Base name for the step

generate_message_id

context.generate_message_id  # => "a1b2c3d4-..."

Generate a new UUID for use as a message identifier.

Examples

Basic Event Publishing

publish = ->(event) {
  puts "[#{event[:event]}] #{event[:data]}"
}

context = RobotLab::Streaming::Context.new(
  run_id: SecureRandom.uuid,
  message_id: SecureRandom.uuid,
  scope: "robot",
  publish: publish
)

context.publish_event(event: "run.started", data: { robot_name: "assistant" })
context.publish_event(event: "text.delta", data: { delta: "Hello " })
context.publish_event(event: "text.delta", data: { delta: "world!" })
context.publish_event(event: "run.completed", data: {})

Network with Child Contexts

# Network-level context
network_ctx = RobotLab::Streaming::Context.new(
  run_id: "net_run_1",
  message_id: "net_msg_1",
  scope: "network",
  publish: ->(e) { stream_to_client(e) }
)

network_ctx.publish_event(event: "run.started", data: {})

# Robot 1 executes
robot1_ctx = network_ctx.create_child_context("robot1_run_1")
robot1_ctx.publish_event(event: "step.started", data: { robot: "classifier" })
robot1_ctx.publish_event(event: "text.delta", data: { delta: "Category: billing" })
robot1_ctx.publish_event(event: "step.completed", data: { robot: "classifier" })

# Robot 2 executes (sequence numbers continue from robot 1)
robot2_ctx = network_ctx.create_child_context("robot2_run_1")
robot2_ctx.publish_event(event: "step.started", data: { robot: "responder" })
robot2_ctx.publish_event(event: "text.delta", data: { delta: "I can help with that." })
robot2_ctx.publish_event(event: "step.completed", data: { robot: "responder" })

network_ctx.publish_event(event: "run.completed", data: {})

Error-Safe Publishing

# Errors in the publish callback are caught and logged,
# so streaming failures do not interrupt execution.
context = RobotLab::Streaming::Context.new(
  run_id: "run_1",
  message_id: "msg_1",
  scope: "robot",
  publish: ->(e) { raise "connection lost" }
)

# This does not raise -- the error is logged via RobotLab.config.logger
chunk = context.publish_event(event: "text.delta", data: { delta: "test" })
# chunk is still returned with the event data

See Also