
I’ve been working on a new Ruby gem and I’m happy to share the first release: TypedBus.
TypedBus is a lightweight, fiber-based pub/sub message bus for Ruby. It gives you named channels with optional type enforcement, explicit ACK/NACK delivery semantics, dead letter queues, backpressure, and adaptive throttling – all running inside a single Async reactor. No threads. No mutexes. Just fibers because we don’t get enough fiber in our diets.
| Source: github.com/MadBomber/typed_bus | Docs: madbomber.github.io/typed_bus |
Why TypedBus?
Ruby has plenty of background job libraries, but in-process pub/sub with delivery guarantees is a different niche. I wanted something that could wire together concurrent pipeline stages within a single process while giving me:
- Confidence that messages were handled – subscribers must explicitly
ack!ornack!each delivery, with auto-nack on timeout - Visibility into failures – every nacked or timed-out delivery lands in a dead letter queue
- Flow control – bounded backpressure and adaptive throttling so a fast producer can’t overwhelm slow consumers
- Type safety at the channel level – catch misrouted messages immediately rather than downstream
Quick Look
require "typed_bus"
bus = TypedBus::MessageBus.new
bus.add_channel(:events, timeout: 5)
bus.subscribe(:events) do |delivery|
puts delivery.message
delivery.ack!
end
Async do
bus.publish(:events, "hello world")
end
That’s the basics. Each subscriber receives a Delivery envelope wrapping the message. Call ack! when you’re done, or nack! to reject it. If you do neither, the delivery auto-nacks after the timeout and routes to the dead letter queue.
Typed Channels
Lock a channel to a specific class and get an ArgumentError the moment something wrong is published:
Order = Data.define(:id, :total)
bus.add_channel(:orders, type: Order, timeout: 10)
Async do
bus.publish(:orders, Order.new(id: 1, total: 29.99)) # OK
bus.publish(:orders, "not an order") # ArgumentError
end
Dead Letter Queues
Failed deliveries don’t vanish. Every channel has a DLQ you can inspect, drain, and hook into:
bus.dead_letters(:orders).on_dead_letter do |delivery|
reason = delivery.timed_out? ? "timeout" : "nack"
log_failure(reason, delivery.message)
end
# Drain and retry later
bus.dead_letters(:orders).drain do |delivery|
retry_order(delivery.message)
end
Backpressure and Adaptive Throttling
Set max_pending on a channel and publishing blocks the fiber when the limit is reached. Add throttle to progressively slow publishers before they hit the hard limit:
bus.add_channel(:pipeline,
max_pending: 100,
throttle: 0.5 # begin backoff at 50% remaining capacity
)
The throttle applies an asymptotic delay curve – the first half of capacity fills at full speed, then each publish sleeps progressively longer as remaining capacity approaches zero. No spikes, no thundering herd, just a smooth ramp.
Multi-Stage Pipelines
Where it gets interesting is wiring stages together. Here’s a validation-then-processing pipeline with typed messages at each stage:
RawEvent = Data.define(:source, :payload, :timestamp)
ValidatedEvent = Data.define(:source, :payload, :validated_at)
bus = TypedBus::MessageBus.new
bus.add_channel(:raw_events, type: RawEvent, timeout: 1, max_pending: 3)
bus.add_channel(:validated, type: ValidatedEvent, timeout: 0.3)
# Stage 1: Validate incoming events
bus.subscribe(:raw_events) do |delivery|
event = delivery.message
if event.payload.nil? || event.payload.empty?
delivery.nack!
else
delivery.ack!
bus.publish(:validated, ValidatedEvent.new(
source: event.source,
payload: event.payload,
validated_at: Time.now
))
end
end
# Stage 2: Process validated events
bus.subscribe(:validated) do |delivery|
process(delivery.message)
delivery.ack!
end
Each stage has its own type constraints, timeout, and DLQ. A bad event nacked at validation doesn’t pollute the processing stage. A slow processor that exceeds its timeout gets collected for retry.
Configuration Cascade
Settings resolve through three tiers – Global, Bus, Channel – so you can set sensible defaults and override where needed:
TypedBus.configure do |config|
config.timeout = 60
config.max_pending = 500
end
bus = TypedBus::MessageBus.new(timeout: 30)
bus.add_channel(:orders) # inherits timeout=30, max_pending=500
bus.add_channel(:alerts, timeout: 5) # overrides timeout, inherits max_pending
Stats
Built-in per-channel counters track published, delivered, nacked, timed-out, dead-lettered, and throttled messages:
bus.stats[:orders_published]
bus.stats[:orders_delivered]
bus.stats[:orders_dead_lettered]
bus.stats.to_h # snapshot of everything
Multiple Buses
There’s no singleton. Each MessageBus is an independent instance with its own channels, subscribers, and stats. Create as many as your application needs:
# Separate concerns into isolated buses
orders_bus = TypedBus::MessageBus.new(timeout: 30)
audit_bus = TypedBus::MessageBus.new(timeout: 60, max_pending: 1000)
realtime_bus = TypedBus::MessageBus.new(timeout: 2)
orders_bus.add_channel(:created, type: Order)
orders_bus.add_channel(:fulfilled, type: Order)
audit_bus.add_channel(:events, type: AuditEntry)
realtime_bus.add_channel(:notifications, type: String, max_pending: 50)
Each bus tracks its own stats independently, so orders_bus.stats and audit_bus.stats never collide. This makes it straightforward to partition your messaging by domain, by latency requirements, or by team ownership.
Get It
gem install typed_bus
Or add to your Gemfile:
gem "typed_bus"
Requires Ruby 3.2+ and the async gem.
Source and docs: github.com/MadBomber/typed_bus
The repo includes 16 runnable examples covering everything from basic usage to concurrent pipelines and performance monitoring. Each one is standalone – just bundle exec ruby examples/01_basic_usage.rb and go.
I’d love to hear feedback. Open an issue or drop me a note.