Skip to content

Dead Letter Queue

SmartMessage includes a comprehensive file-based Dead Letter Queue (DLQ) system for capturing, storing, and replaying failed messages. The DLQ provides production-grade reliability with automatic integration into the circuit breaker system.

Overview

The Dead Letter Queue serves as a safety net for your messaging system:

  • Automatic Capture: Failed messages are automatically stored when circuit breakers trip
  • Manual Capture: Explicitly store messages that fail business logic validation
  • Replay Capabilities: Retry failed messages individually, in batches, or all at once
  • Transport Override: Replay messages through a different transport than originally configured
  • Administrative Tools: Filter, analyze, and export messages for debugging
  • Thread-Safe: All operations are protected with mutex for concurrent access

File Format

The DLQ uses JSON Lines (.jsonl) format - one JSON object per line:

{"timestamp":"2025-08-19T10:30:45Z","header":{...},"payload":"...","error":"Connection timeout","retry_count":0,"transport":"Redis","stack_trace":"..."}
{"timestamp":"2025-08-19T10:31:12Z","header":{...},"payload":"...","error":"Circuit breaker open","retry_count":1,"transport":"Redis","stack_trace":"..."}

Benefits of JSON Lines: - Append-only for efficient writes - Line-by-line processing for memory efficiency - Human-readable for debugging - Easy to process with standard Unix tools

Configuration

Global Default Configuration

Configure a default DLQ that all components will use:

# Set default path for all DLQ operations
SmartMessage::DeadLetterQueue.configure_default('/var/log/app/dlq.jsonl')

# Access the default instance anywhere
dlq = SmartMessage::DeadLetterQueue.default

Environment-Based Configuration

Use environment variables for different deployments:

# In your application initialization
SmartMessage::DeadLetterQueue.configure_default(
  ENV.fetch('SMART_MESSAGE_DLQ_PATH', 'dead_letters.jsonl')
)

Per-Environment Configuration

Configure different paths for each environment:

# config/initializers/smart_message.rb (Rails example)
case Rails.env
when 'production'
  SmartMessage::DeadLetterQueue.configure_default('/var/log/smart_message/production_dlq.jsonl')
when 'staging'
  SmartMessage::DeadLetterQueue.configure_default('/var/log/smart_message/staging_dlq.jsonl')
else
  SmartMessage::DeadLetterQueue.configure_default('tmp/development_dlq.jsonl')
end

Custom Instances

Create separate DLQ instances for different purposes:

# Critical failures need special handling
critical_dlq = SmartMessage::DeadLetterQueue.new('/var/log/critical_failures.jsonl')

# Separate DLQ for payment messages
payment_dlq = SmartMessage::DeadLetterQueue.new('/var/log/payment_failures.jsonl')

# Temporary DLQ for testing
test_dlq = SmartMessage::DeadLetterQueue.new('/tmp/test_failures.jsonl')

Core Operations

FIFO Queue Operations

The DLQ operates as a First-In-First-Out queue:

dlq = SmartMessage::DeadLetterQueue.default

# Add a failed message (accepts decoded message instance)
entry = dlq.enqueue(
  decoded_message,     # SmartMessage::Base instance
  error: "Connection timeout",
  retry_count: 0,
  transport: "Redis",
  stack_trace: exception.backtrace.join("\n")
)

# Check queue size
puts "Messages in queue: #{dlq.size}"

# Peek at the oldest message without removing it
next_message = dlq.peek
puts "Next for replay: #{next_message[:header][:message_class]}"

# Remove and get the oldest message
message = dlq.dequeue
process_message(message) if message

# Clear all messages
dlq.clear

Message Structure

Each DLQ entry contains:

{
  timestamp: "2025-08-19T10:30:45Z",        # When the failure occurred
  header: {                                 # Complete message header
    uuid: "abc-123",
    message_class: "OrderMessage",
    published_at: "2025-08-19T10:30:40Z",
    publisher_pid: 12345,
    version: 1,
    from: "order-service",
    to: "payment-service",
    reply_to: "order-service"
  },
  payload: '{"order_id":"123","amount":99.99}',  # Original message payload
  payload_format: "json",                   # Serialization format
  error: "Connection refused",              # Error message
  retry_count: 2,                           # Number of retry attempts
  transport: "Redis",                       # Transport that failed
  stack_trace: "..."                        # Full stack trace (optional)
}

Replay Capabilities

Individual Message Replay

Replay the oldest message:

result = dlq.replay_one
if result[:success]
  puts "Message replayed successfully"
else
  puts "Replay failed: #{result[:error]}"
end

Batch Replay

Replay multiple messages:

# Replay next 10 messages
results = dlq.replay_batch(10)
puts "Successful: #{results[:success]}"
puts "Failed: #{results[:failed]}"
results[:errors].each do |error|
  puts "Error: #{error}"
end

Full Queue Replay

Replay all messages:

results = dlq.replay_all
puts "Replayed #{results[:success]} messages"
puts "Failed to replay #{results[:failed]} messages"

Transport Override

Replay through a different transport:

# Original message used Redis, replay through RabbitMQ
rabbit_transport = SmartMessage::Transport.create(:rabbitmq)

# Replay one with override
dlq.replay_one(rabbit_transport)

# Replay batch with override
dlq.replay_batch(10, rabbit_transport)

# Replay all with override
dlq.replay_all(rabbit_transport)

Administrative Functions

Message Filtering

Filter messages for analysis:

# Find all failed OrderMessage instances
order_failures = dlq.filter_by_class('OrderMessage')
puts "Found #{order_failures.size} failed orders"

# Find all timeout errors
timeout_errors = dlq.filter_by_error_pattern(/timeout/i)
timeout_errors.each do |entry|
  puts "Timeout at #{entry[:timestamp]}: #{entry[:error]}"
end

# Find connection errors
connection_errors = dlq.filter_by_error_pattern('Connection refused')

Statistics

Get queue statistics:

stats = dlq.statistics
puts "Total messages: #{stats[:total]}"

# Breakdown by message class
stats[:by_class].each do |klass, count|
  puts "#{klass}: #{count} failures"
end

# Breakdown by error type
stats[:by_error].sort_by { |_, count| -count }.first(5).each do |error, count|
  puts "#{error}: #{count} occurrences"
end

Time-Based Export

Export messages within a time range:

# Get failures from the last hour
one_hour_ago = Time.now - 3600
recent_failures = dlq.export_range(one_hour_ago, Time.now)

# Get failures from yesterday
yesterday_start = Time.now - 86400
yesterday_end = Time.now - 1
yesterday_failures = dlq.export_range(yesterday_start, yesterday_end)

# Export for analysis
File.write('failures_export.json', recent_failures.to_json)

Message Inspection

Inspect messages without removing them:

# Look at next 10 messages
messages = dlq.inspect_messages(limit: 10)
messages.each do |msg|
  puts "#{msg[:timestamp]} - #{msg[:header][:message_class]}: #{msg[:error]}"
end

# Default limit is 10
dlq.inspect_messages.each do |msg|
  analyze_failure(msg)
end

Integration with Circuit Breakers

The DLQ is automatically integrated with SmartMessage's circuit breaker system:

Automatic Capture

When circuit breakers trip, messages are automatically sent to the DLQ:

class PaymentMessage < SmartMessage::Base
  config do
    transport SmartMessage::Transport.create(:redis)
    # Circuit breaker configured automatically
  end
end

# If Redis is down, circuit breaker trips and message goes to DLQ
message = PaymentMessage.new(amount: 100.00)
begin
  message.publish
rescue => e
  # Message is already in DLQ via circuit breaker
  puts "Message saved to DLQ"
end

Manual Circuit Breaker Integration

Configure custom circuit breakers with DLQ fallback:

class CriticalService
  include BreakerMachines::DSL

  circuit :external_api do
    threshold failures: 3, within: 60.seconds
    reset_after 30.seconds

    # Use custom DLQ for critical failures
    custom_dlq = SmartMessage::DeadLetterQueue.new('/var/log/critical.jsonl')
    fallback SmartMessage::CircuitBreaker::Fallbacks.dead_letter_queue(custom_dlq)
  end

  def call_api(message)
    circuit(:external_api).wrap do
      # API call that might fail
      external_api.send(message)
    end
  end
end

Monitoring and Alerting

Queue Size Monitoring

Monitor DLQ growth:

# Simple monitoring script
loop do
  dlq = SmartMessage::DeadLetterQueue.default
  size = dlq.size

  if size > 100
    send_alert("DLQ size critical: #{size} messages")
  elsif size > 50
    send_warning("DLQ size warning: #{size} messages")
  end

  sleep 60  # Check every minute
end

Error Pattern Detection

Detect systematic failures:

dlq = SmartMessage::DeadLetterQueue.default
stats = dlq.statistics

# Check for dominant error patterns
top_error = stats[:by_error].max_by { |_, count| count }
if top_error && top_error[1] > 10
  alert("Systematic failure detected: #{top_error[0]} (#{top_error[1]} occurrences)")
end

# Check for specific service failures
stats[:by_class].each do |klass, count|
  if count > 5
    alert("Service degradation: #{klass} has #{count} failures")
  end
end

Best Practices

1. Regular Monitoring

Set up monitoring for DLQ size and growth rate:

# Prometheus metrics example
dlq_size = Prometheus::Client::Gauge.new(:dlq_size, 'Dead letter queue size')
dlq_size.set(SmartMessage::DeadLetterQueue.default.size)

2. Automated Replay

Schedule periodic replay attempts:

# Sidekiq job example
class ReplayDLQJob
  include Sidekiq::Worker

  def perform
    dlq = SmartMessage::DeadLetterQueue.default

    # Only replay if queue is manageable
    if dlq.size < 100
      results = dlq.replay_all
      log_results(results)
    else
      # Replay in smaller batches
      results = dlq.replay_batch(10)
      log_results(results)
    end
  end

  private

  def log_results(results)
    Rails.logger.info("DLQ Replay: #{results[:success]} success, #{results[:failed]} failed")
  end
end

3. Archival Strategy

Archive old messages:

# Archive messages older than 7 days
def archive_old_messages
  dlq = SmartMessage::DeadLetterQueue.default
  archive_path = "/var/archive/dlq_#{Date.today}.jsonl"

  seven_days_ago = Time.now - (7 * 86400)
  old_messages = dlq.export_range(Time.at(0), seven_days_ago)

  if old_messages.any?
    File.write(archive_path, old_messages.map(&:to_json).join("\n"))
    # Remove archived messages from active DLQ
    # (Note: This would require implementing a remove_range method)
  end
end

4. Error Classification

Classify errors for better handling:

class DLQAnalyzer
  TRANSIENT_ERRORS = [
    /connection refused/i,
    /timeout/i,
    /temporarily unavailable/i
  ]

  PERMANENT_ERRORS = [
    /invalid message format/i,
    /unauthorized/i,
    /not found/i
  ]

  def self.classify_errors(dlq)
    transient = []
    permanent = []

    dlq.inspect_messages(limit: 100).each do |msg|
      if TRANSIENT_ERRORS.any? { |pattern| msg[:error].match?(pattern) }
        transient << msg
      elsif PERMANENT_ERRORS.any? { |pattern| msg[:error].match?(pattern) }
        permanent << msg
      end
    end

    { transient: transient, permanent: permanent }
  end
end

Troubleshooting

Common Issues

1. DLQ File Growing Too Large

# Rotate DLQ files
def rotate_dlq
  dlq = SmartMessage::DeadLetterQueue.default
  timestamp = Time.now.strftime('%Y%m%d_%H%M%S')

  # Move current file
  FileUtils.mv(dlq.file_path, "#{dlq.file_path}.#{timestamp}")

  # DLQ will create new file automatically
end

2. Replay Failures

# Debug replay failures
result = dlq.replay_one
unless result[:success]
  puts "Replay failed: #{result[:error]}"

  # Check if message class still exists
  message = dlq.peek
  begin
    message[:header][:message_class].constantize
  rescue NameError => e
    puts "Message class no longer exists: #{e.message}"
  end
end

3. Corrupted DLQ File

# Recover from corrupted file
def recover_dlq(corrupted_path)
  recovered = []

  File.foreach(corrupted_path) do |line|
    begin
      entry = JSON.parse(line.strip, symbolize_names: true)
      recovered << entry
    rescue JSON::ParserError
      # Skip corrupted line
      puts "Skipping corrupted line: #{line[0..50]}..."
    end
  end

  # Write recovered entries to new file
  new_dlq = SmartMessage::DeadLetterQueue.new("#{corrupted_path}.recovered")
  recovered.each do |entry|
    new_dlq.enqueue(
      SmartMessage::Header.new(entry[:header]),
      entry[:payload],
      error: entry[:error],
      retry_count: entry[:retry_count]
    )
  end

  puts "Recovered #{recovered.size} messages"
end

Performance Considerations

File I/O Optimization

The DLQ uses several optimizations:

  1. Append-only writes: New messages are appended, not inserted
  2. Immediate sync: file.fsync ensures durability
  3. Mutex protection: Thread-safe but may create contention
  4. Line-based processing: Memory efficient for large files

Scaling Strategies

For high-volume systems:

# Use multiple DLQ instances by message type
class DLQRouter
  def self.get_dlq_for(message_class)
    case message_class
    when /Payment/
      @payment_dlq ||= SmartMessage::DeadLetterQueue.new('/var/log/payment_dlq.jsonl')
    when /Order/
      @order_dlq ||= SmartMessage::DeadLetterQueue.new('/var/log/order_dlq.jsonl')
    else
      SmartMessage::DeadLetterQueue.default
    end
  end
end

Memory Usage

For large DLQ files:

# Process in chunks to avoid memory issues
def process_large_dlq(dlq, chunk_size: 100)
  processed = 0

  while dlq.size > 0 && processed < 1000
    # Process only chunk_size at a time
    chunk_size.times do
      break if dlq.size == 0

      message = dlq.dequeue
      process_message(message)
      processed += 1
    end

    # Let other operations run
    sleep(0.1)
  end

  processed
end

Security Considerations

File Permissions

Ensure proper file permissions:

# Set restrictive permissions on DLQ files
def secure_dlq_file(path)
  File.chmod(0600, path) if File.exist?(path)  # Read/write for owner only
end

Sensitive Data

Be careful with sensitive data in DLQ:

# Sanitize sensitive data before storing
def sanitize_for_dlq(payload)
  data = JSON.parse(payload)
  data['credit_card'] = 'REDACTED' if data['credit_card']
  data['password'] = 'REDACTED' if data['password']
  data.to_json
end

Encryption

For sensitive environments:

# Example: Encrypt DLQ entries
require 'openssl'

class EncryptedDLQ < SmartMessage::DeadLetterQueue
  def enqueue(header, payload, **options)
    encrypted_payload = encrypt(payload)
    super(header, encrypted_payload, **options)
  end

  def dequeue
    entry = super
    return nil unless entry

    entry[:payload] = decrypt(entry[:payload])
    entry
  end

  private

  def encrypt(data)
    # Implement encryption
  end

  def decrypt(data)
    # Implement decryption
  end
end

Summary

The SmartMessage Dead Letter Queue provides:

  • Reliability: Automatic capture of failed messages
  • Flexibility: Multiple configuration options
  • Recoverability: Comprehensive replay capabilities
  • Observability: Statistics and filtering for analysis
  • Integration: Seamless circuit breaker integration
  • Production-Ready: Thread-safe, performant, and scalable

The DLQ ensures that no message is lost, even during system failures, and provides the tools needed to analyze, replay, and manage failed messages effectively.