Ractor Patterns¶
Common patterns for using RactorQueue across Ractor boundaries.
1. Single Producer / Single Consumer (1P1C)¶
The baseline pattern — one Ractor feeds another through a shared queue. Use a sentinel value (:done, :stop, etc.) to signal end-of-stream.
q = RactorQueue.new(capacity: 1024)
producer = Ractor.new(q) do |queue|
100.times { |i| queue.push(i * i) }
queue.push(:done)
end
consumer = Ractor.new(q) do |queue|
results = []
loop do
v = queue.pop
break if v == :done
results << v
end
results
end
producer.value
puts consumer.value.inspect # [0, 1, 4, 9, 16, ...]
Queue sizing: With a single producer and consumer, a modest queue (1024–4096) is sufficient. The producer can run ahead while the consumer processes, smoothing out bursts.
2. Worker Pool (MPMC)¶
A shared job queue drained by N Ractor workers. One sentinel per worker signals shutdown:
WORKERS = 8
jobs = RactorQueue.new(capacity: 10_000)
results = RactorQueue.new(capacity: 10_000)
workers = WORKERS.times.map do
Ractor.new(jobs, results) do |jq, rq|
loop do
job = jq.pop(timeout: 30)
break if job == :stop
rq.push(job * job) # do work
end
end
end
1000.times { |i| jobs.push(i) }
WORKERS.times { jobs.push(:stop) } # one per worker
results_list = 1000.times.map { results.pop }
workers.each(&:value)
Two-Queue Deadlock
Chaining two small bounded queues can deadlock (see Concurrency Notes). Size the queues large enough that producers never block while consumers are blocked on the other queue, or drain the result queue asynchronously in a separate Ractor.
Queue sizing for the worker pool pattern:
jobscapacity: at leastjob_count + worker_count(so main can push all jobs without blocking)resultscapacity: at leastjob_count(so workers can push all results without blocking)
3. Queue Pool (High Ractor Counts)¶
When many Ractors share a single bounded queue, the spin-wait backoff keeps things moving, but beyond roughly 2 × CPU cores Ractors doing pure queue operations, cache-line contention causes diminishing returns.
The queue pool pattern gives each producer/consumer pair its own queue — zero cross-pair contention, linear scaling to core count:
PAIRS = 16 # 32 Ractors total
pairs = PAIRS.times.map do
q = RactorQueue.new(capacity: 1024)
p = Ractor.new(q) { |queue| 1000.times { |i| queue.push(i) } }
c = Ractor.new(q) { |queue| 1000.times { queue.pop } }
[p, c]
end
pairs.each { |p, c| p.value; c.value }
Trade-off: Work is statically partitioned — each producer feeds only its paired consumer. For dynamic load balancing across many workers, use a small number of shared queues (e.g., 4 queues for 16 Ractors) with jobs chunked large enough that producers rarely hit the capacity limit.
4. Two-Stage Pipeline¶
Ractors chained in stages, each transforming values and passing them downstream:
raw = RactorQueue.new(capacity: 64)
middle = RactorQueue.new(capacity: 64)
stage1 = Ractor.new(raw, middle) do |src, dst|
loop do
v = src.pop(timeout: 5)
break if v == :done
dst.push(v * 2)
end
dst.push(:done)
end
stage2 = Ractor.new(middle) do |src|
output = []
loop do
v = src.pop(timeout: 5)
break if v == :done
output << v * 3
end
output
end
5.times { |i| raw.push(i + 1) } # push 1..5
raw.push(:done)
stage1.value
puts stage2.value.inspect # [6, 12, 18, 24, 30]
Pipeline Queue Sizing
In a pipeline, each stage's output queue should be large enough to buffer a burst from the upstream stage. If stages process at different speeds, size the intermediate queues to the slower stage's burst capacity.
5. Validate-Shareable Guard¶
Use validate_shareable: true on queues that feed Ractors to catch bad producers at the push site rather than at the Ractor boundary:
safe_q = RactorQueue.new(capacity: 64, validate_shareable: true)
safe_q.push(42) # ok
safe_q.push("hello".freeze) # ok
safe_q.push(:symbol) # ok
safe_q.push([1, 2, 3]) # raises RactorQueue::NotShareableError immediately
The guard also fires inside a Ractor block:
bad_producer = Ractor.new(safe_q) do |queue|
queue.push([4, 5, 6])
rescue RactorQueue::NotShareableError => err
"caught: #{err.message}"
end
puts bad_producer.value # "caught: [4, 5, 6] is not Ractor-shareable"
6. SIMD Fan-Out (Parallel.map)¶
Apply the same CPU-bound function to N items across W Ractor workers. This is the SIMD (Single Instruction Multiple Data) parallel pattern — each worker runs identical code, only the data differs.
Ractors outperform Threads for this workload because each Ractor has its own GVL: W Ractors run W jobs in true parallel, whereas W threads share one GVL and take turns.
The key trick for passing a lambda into Ractors is a one-shot fn_q queue: Ractor.make_shareable raises Ractor::IsolationError on lambdas whose self is not shareable, but RactorQueue carries the lambda across without freezing it.
require "ractor_queue"
module Compute
def self.square(x) = x * x
end
def parallel_map(items, workers: Etc.nprocessors, fn:)
cap = 4096
jobs = RactorQueue.new(capacity: cap)
res = RactorQueue.new(capacity: cap)
fn_q = RactorQueue.new(capacity: workers + 1)
workers.times { fn_q.push(fn) }
out = Array.new(items.size)
drain = Thread.new do
items.size.times { v = res.pop; out[v[0]] = v[1] }
end
ractors = workers.times.map do
Ractor.new(fn_q, jobs, res) do |fq, jq, rq|
f = fq.pop
loop do
pair = jq.pop
break if pair.equal?(:stop)
idx, item = pair
rq.push([idx, f.call(item)].freeze)
end
end
end
items.each_with_index { |item, i| jobs.push([i, item].freeze) }
workers.times { jobs.push(:stop) }
ractors.each(&:value)
drain.join
out
end
results = parallel_map((1..100).to_a, fn: ->(x) { Compute.square(x) })
puts results.first(5).inspect # [1, 4, 9, 16, 25]
Queue sizing: Fixed at 4096 regardless of corpus size. The drain thread runs concurrently so the results queue never fills and stalls the workers.
7. MIMD Multi-Stage Pipeline (Parallel.pipeline)¶
Chain K stages of W Ractor workers each, connected by RactorQueues. Items flow through every stage in order; each stage transforms its input and pushes the result downstream. This is the MIMD (Multiple Instruction Multiple Data) pattern — each stage runs different code.
Shutdown uses stop-pill propagation: W stop pills enter stage 0; each worker that sees one forwards exactly one stop pill to the next stage and exits. After K stages, exactly W stop pills arrive at the results queue, where the drain thread uses them as its termination signal.
require "ractor_queue"
def pipeline(items, stages:, workers: (Etc.nprocessors / 2).clamp(4, 8))
cap = 4096
queues = Array.new(stages.size + 1) { RactorQueue.new(capacity: cap) }
fn_queues = stages.map do |s|
fq = RactorQueue.new(capacity: workers + 1)
workers.times { fq.push(s) }
fq
end
all_ractors = []
stages.each_with_index do |_, si|
iq, oq, fq = queues[si], queues[si + 1], fn_queues[si]
workers.times do
all_ractors << Ractor.new(fq, iq, oq) do |fn_q, input, output|
f = fn_q.pop
loop do
item = input.pop
if item.equal?(:stop)
output.push(:stop)
break
end
output.push(Ractor.make_shareable(f.call(item)))
end
end
end
end
out = []
drain = Thread.new do
stop_count = 0
loop do
v = queues[-1].pop
v.equal?(:stop) ? (stop_count += 1; break if stop_count == workers) : out << v
end
end
items.each { |item| queues[0].push(item) }
workers.times { queues[0].push(:stop) }
drain.join
all_ractors.each(&:value)
out
end
double = ->(x) { (x * 2).freeze }
to_s = ->(x) { x.to_s.freeze }
results = pipeline((1..10).to_a, stages: [double, to_s])
puts results.sort.inspect # ["10", "12", "14", "16", "18", "2", "4", "6", "8", "20"] (sorted)
Result order is not guaranteed
Workers in each stage consume items concurrently, so output order depends on scheduling. Sort results after collection if order matters.
Workers per stage: (Etc.nprocessors / 2).clamp(4, 8) is a good default for a 2-stage pipeline on M-series chips. More workers per stage increases queue contention; fewer under-utilises cores.
Choosing a Pattern¶
| Situation | Pattern |
|---|---|
| Simple producer → consumer handoff | 1P1C |
| Dynamic job dispatch to N workers | Worker Pool |
| Ractor count > 2× CPU cores | Queue Pool |
| Multi-stage data transformation (1 worker/stage) | Pipeline |
| Need to enforce Ractor-safe payloads | validate_shareable |
| Same function applied to N items in parallel | SIMD Fan-Out |
| Multi-stage pipeline with W workers per stage | MIMD Pipeline |