Skip to content

Dispatcher & Message Routing

The dispatcher is the heart of SmartMessage's message routing system. It manages subscriptions, routes incoming messages to appropriate handlers, and coordinates concurrent processing using thread pools.

Overview

The dispatcher handles: - Subscription Management: Tracking which classes want to receive which messages - Message Routing: Directing incoming messages to registered handlers - Concurrent Processing: Using thread pools for parallel message processing - Statistics Collection: Tracking message processing metrics - Error Isolation: Preventing individual message failures from affecting the system

Core Components

SmartMessage::Dispatcher

Located at lib/smart_message/dispatcher.rb:11-147, the dispatcher is the central routing engine.

Key Features: - Thread-safe subscription management - Concurrent message processing via Concurrent::CachedThreadPool - Automatic thread pool lifecycle management - Built-in statistics collection - Graceful shutdown handling

Subscription Management

Adding Subscriptions

SmartMessage supports multiple subscription patterns:

# 1. Default handler - uses self.process method
MyMessage.subscribe
# Registers "MyMessage.process" as the handler

# 2. Custom method handler
MyMessage.subscribe("MyService.handle_message")
# Registers "MyService.handle_message" as the handler

# 3. Block handler (NEW!)
handler_id = MyMessage.subscribe do |header, payload|
  puts "Processing: #{JSON.parse(payload)}"
end
# Registers a proc handler with generated ID like "MyMessage.proc_abc123"

# 4. Proc handler (NEW!)
my_proc = proc { |header, payload| log_message(payload) }
proc_id = MyMessage.subscribe(my_proc)
# Registers the proc with generated ID

# 5. Lambda handler (NEW!)
my_lambda = lambda { |header, payload| validate_message(payload) }
lambda_id = MyMessage.subscribe(my_lambda)

# Multiple handlers for the same message
MyMessage.subscribe("MyMessage.audit")
MyMessage.subscribe("MyMessage.notify")
MyMessage.subscribe { |h,p| puts "Quick log" }
# All handlers will receive the message

Message Filtering (NEW!)

SmartMessage supports advanced message filtering using exact strings, regular expressions, or arrays for precise message routing:

# Basic string filtering (exact match)
MyMessage.subscribe(from: 'payment-service')
MyMessage.subscribe(to: 'order-processor')

# Regular expression filtering
MyMessage.subscribe(from: /^payment-.*/)        # All payment services
MyMessage.subscribe(to: /^(dev|staging)-.*/)    # Development environments

# Array filtering (multiple options)
MyMessage.subscribe(from: ['admin', 'system', 'monitoring'])

# Mixed exact and pattern matching
MyMessage.subscribe(from: ['admin', /^system-.*/, 'legacy-service'])

# Combined filtering
MyMessage.subscribe(
  from: /^admin-.*/, 
  to: ['order-service', /^fulfillment-.*/]
)

# Broadcast + directed filtering
MyMessage.subscribe(broadcast: true, to: 'api-service')

Filter Types

String Filters (Exact Match)

# Subscribe only to messages from specific sender
OrderMessage.subscribe(from: 'payment-service')

# Subscribe only to messages directed to specific recipient
OrderMessage.subscribe(to: 'order-processor')

Regular Expression Filters (Pattern Match)

# Environment-based routing
DevService.subscribe(to: /^(dev|staging)-.*/)
ProdService.subscribe(to: /^prod-.*/)

# Service pattern routing
PaymentProcessor.subscribe(from: /^payment-.*/)
ApiService.subscribe(from: /^(web|mobile|api)-.*/)

Array Filters (Multiple Options)

# Multiple specific services
AdminService.subscribe(from: ['admin', 'system', 'monitoring'])

# Mixed patterns and exact matches
AlertService.subscribe(from: ['admin', /^system-.*/, 'security'])

Combined Filters

# Complex multi-criteria filtering
OrderMessage.subscribe(
  from: /^(admin|system)-.*/, 
  to: ['order-service', /^fulfillment-.*/]
)

# Admin services to production only
AdminMessage.subscribe(from: /^admin-.*/, to: /^prod-.*/)

Filter Validation

Filters are validated at subscription time:

# Valid filters
MyMessage.subscribe(from: 'service')           # String
MyMessage.subscribe(from: /^service-.*/)       # Regexp  
MyMessage.subscribe(from: ['a', /^b-.*/])      # Array of String/Regexp

# Invalid filters (raise ArgumentError)
MyMessage.subscribe(from: 123)                 # Invalid type
MyMessage.subscribe(from: ['valid', 123])      # Invalid array element

Removing Subscriptions

# Remove specific method handler
MyMessage.unsubscribe("MyMessage.custom_handler")

# Remove specific proc/block handler using returned ID
block_id = MyMessage.subscribe { |h,p| puts p }
MyMessage.unsubscribe(block_id)  # Cleans up proc from registry too

# Remove ALL handlers for a message class
MyMessage.unsubscribe!

# Remove all subscriptions (useful for testing)
dispatcher = SmartMessage::Dispatcher.new
dispatcher.drop_all!

Viewing Subscriptions

dispatcher = SmartMessage::Dispatcher.new

# View all subscriptions
puts dispatcher.subscribers
# => {"MyMessage" => ["MyMessage.process", "MyMessage.audit"]}

# Check specific message subscriptions
puts dispatcher.subscribers["MyMessage"]
# => ["MyMessage.process", "MyMessage.audit"]

Message Routing Process

1. Message Reception

When a transport receives a message, it calls the dispatcher:

# Transport receives serialized message and routes it
transport.receive(message_class, serialized_message)
# This internally decodes the message and calls:
dispatcher.route(decoded_message)

2. Subscription Lookup

The dispatcher finds all registered handlers:

def route(decoded_message)
  message_klass = decoded_message._sm_header.message_class
  return nil if @subscribers[message_klass].empty?

  @subscribers[message_klass].each do |subscription|
    # Process each handler with filters
  end
end

3. Concurrent Processing

Each handler is processed in its own thread, with support for both method and proc handlers:

@subscribers[message_klass].each do |subscription|
  message_processor = subscription[:process_method]
  SS.add(message_klass, message_processor, 'routed')

  @router_pool.post do
    # This runs in a separate thread with circuit breaker protection
    circuit_result = circuit(:message_processor).wrap do
      # Check if this is a proc handler or a regular method call
      if proc_handler?(message_processor)
        # Call the proc handler via SmartMessage::Base
        SmartMessage::Base.call_proc_handler(message_processor, decoded_message)
      else
        # Original method call logic
        parts = message_processor.split('.')
        target_klass = parts[0]  # "MyMessage" 
        class_method = parts[1]  # "process"

        target_klass.constantize
                    .method(class_method)
                    .call(decoded_message)
      end
    end

    # Handle circuit breaker fallback if triggered
    if circuit_result.is_a?(Hash) && circuit_result[:circuit_breaker]
      handle_circuit_breaker_fallback(circuit_result, decoded_message, message_processor)
    end
  end
end

Handler Types Processed: - Method handlers: "ClassName.method_name" → resolved via constantize - Proc handlers: "ClassName.proc_abc123" → looked up in proc registry
- Block handlers: "ClassName.proc_def456" → treated as proc handlers - Lambda handlers: "ClassName.proc_ghi789" → treated as proc handlers

Thread Pool Management

Thread Pool Configuration

The dispatcher uses Concurrent::CachedThreadPool which automatically manages thread creation and destruction:

def initialize
  @router_pool = Concurrent::CachedThreadPool.new

  # Automatic cleanup on exit
  at_exit do
    shutdown_thread_pool
  end
end

Monitoring Thread Pool Status

dispatcher = SmartMessage::Dispatcher.new

# Get comprehensive status
status = dispatcher.status
puts "Running: #{status[:running]}"
puts "Queue length: #{status[:queue_length]}"
puts "Scheduled tasks: #{status[:scheduled_task_count]}"
puts "Completed tasks: #{status[:completed_task_count]}"
puts "Current threads: #{status[:length]}"

# Individual status methods
puts dispatcher.running?              # Is the pool active?
puts dispatcher.queue_length          # How many tasks are waiting?
puts dispatcher.scheduled_task_count  # Total tasks scheduled
puts dispatcher.completed_task_count  # Total tasks completed
puts dispatcher.current_length        # Current number of threads

Thread Pool Lifecycle

# Automatic shutdown handling
at_exit do
  print "Shutting down the dispatcher's thread pool..."
  @router_pool.shutdown

  while @router_pool.shuttingdown?
    print '.'
    sleep 1
  end

  puts " done."
end

Message Processing Patterns

Standard Processing

class OrderMessage < SmartMessage::Base
  property :order_id
  property :customer_id
  property :items

  # Standard process method
  def self.process(message_header, message_payload)
    # 1. Decode the message
    data = JSON.parse(message_payload)
    order = new(data)

    # 2. Execute business logic
    fulfill_order(order)

    # 3. Optional: publish follow-up messages
    ShippingMessage.new(
      order_id: order.order_id,
      address: get_shipping_address(order.customer_id)
    ).publish
  end

  private

  def self.fulfill_order(order)
    # Business logic here
  end
end

# Subscribe to receive messages
OrderMessage.subscribe

Multiple Handlers

class PaymentMessage < SmartMessage::Base
  property :payment_id
  property :amount
  property :customer_id

  # Primary payment processing
  def self.process(message_header, message_payload)
    data = JSON.parse(message_payload)
    payment = new(data)

    process_payment(payment)
  end

  # Audit logging handler
  def self.audit(message_header, message_payload)
    data = JSON.parse(message_payload)
    payment = new(data)

    log_payment_attempt(payment)
  end

  # Fraud detection handler
  def self.fraud_check(message_header, message_payload)
    data = JSON.parse(message_payload)
    payment = new(data)

    if suspicious_payment?(payment)
      flag_for_review(payment)
    end
  end
end

# Register all handlers
PaymentMessage.subscribe("PaymentMessage.process")
PaymentMessage.subscribe("PaymentMessage.audit")
PaymentMessage.subscribe("PaymentMessage.fraud_check")

Error Handling in Processors

class RobustMessage < SmartMessage::Base
  property :data

  def self.process(message_header, message_payload)
    begin
      data = JSON.parse(message_payload)
      message = new(data)

      # Main processing logic
      process_business_logic(message)

    rescue JSON::ParserError => e
      # Handle malformed messages
      log_error("Invalid message format", message_header, e)

    rescue BusinessLogicError => e
      # Handle business logic failures
      log_error("Business logic failed", message_header, e)

      # Optionally republish to error queue
      ErrorMessage.new(
        original_message: message_payload,
        error: e.message,
        retry_count: get_retry_count(message_header)
      ).publish

    rescue => e
      # Handle unexpected errors
      log_error("Unexpected error", message_header, e)
      raise  # Re-raise to trigger dispatcher error handling
    end
  end

  private

  def self.log_error(type, header, error)
    puts "#{type}: #{error.message}"
    puts "Message class: #{header.message_class}"
    puts "Message UUID: #{header.uuid}"
  end
end

Advanced Routing Patterns

Conditional Processing

class ConditionalMessage < SmartMessage::Base
  property :environment
  property :data

  def self.process(message_header, message_payload)
    data = JSON.parse(message_payload)
    message = new(data)

    # Route based on message content
    case message.environment
    when 'production'
      production_handler(message)
    when 'staging'
      staging_handler(message)
    when 'development'
      development_handler(message)
    else
      default_handler(message)
    end
  end
end

Message Transformation and Republishing

class TransformMessage < SmartMessage::Base
  property :raw_data
  property :format

  def self.process(message_header, message_payload)
    data = JSON.parse(message_payload)
    message = new(data)

    # Transform the message
    case message.format
    when 'csv'
      transformed = transform_csv(message.raw_data)
    when 'xml'
      transformed = transform_xml(message.raw_data)
    else
      transformed = message.raw_data
    end

    # Republish as a different message type
    ProcessedMessage.new(
      original_id: message_header.uuid,
      processed_data: transformed,
      processed_at: Time.now
    ).publish
  end
end

Fan-out Processing

class EventMessage < SmartMessage::Base
  property :event_type
  property :user_id
  property :data

  def self.process(message_header, message_payload)
    data = JSON.parse(message_payload)
    event = new(data)

    # Fan out to multiple specialized handlers
    case event.event_type
    when 'user_signup'
      WelcomeEmailMessage.new(user_id: event.user_id).publish
      AnalyticsMessage.new(event: 'signup', user_id: event.user_id).publish
      AuditMessage.new(action: 'user_created', user_id: event.user_id).publish

    when 'purchase'
      InventoryMessage.new(items: event.data['items']).publish
      ReceiptMessage.new(user_id: event.user_id, total: event.data['total']).publish
      LoyaltyMessage.new(user_id: event.user_id, points: calculate_points(event.data)).publish
    end
  end
end

Statistics and Monitoring

Built-in Statistics

The dispatcher automatically collects statistics via the SimpleStats (SS) system:

# Statistics are automatically collected for:
# - Message publishing: SS.add(message_class, 'publish')
# - Message routing: SS.add(message_class, process_method, 'routed')

# View all statistics
puts SS.stat

# Get specific statistics
publish_count = SS.get("MyMessage", "publish")
process_count = SS.get("MyMessage", "MyMessage.process", "routed")

# Reset statistics
SS.reset  # Clear all
SS.reset("MyMessage", "publish")  # Clear specific stat

Custom Monitoring

class MonitoredMessage < SmartMessage::Base
  property :data

  def self.process(message_header, message_payload)
    start_time = Time.now

    begin
      # Process the message
      data = JSON.parse(message_payload)
      message = new(data)

      process_business_logic(message)

      # Record success metrics
      record_processing_time(Time.now - start_time)
      increment_success_counter

    rescue => e
      # Record failure metrics
      record_error(e)
      increment_failure_counter
      raise
    end
  end

  private

  def self.record_processing_time(duration)
    SS.add("MonitoredMessage", "processing_time", how_many: duration)
  end

  def self.increment_success_counter
    SS.add("MonitoredMessage", "success")
  end

  def self.increment_failure_counter
    SS.add("MonitoredMessage", "failure")
  end
end

Performance Considerations

Thread Pool Sizing

The CachedThreadPool automatically manages thread creation, but you can influence behavior:

# For high-throughput scenarios, consider a custom thread pool
class CustomDispatcher < SmartMessage::Dispatcher
  def initialize(min_threads: 5, max_threads: 50)
    @router_pool = Concurrent::ThreadPoolExecutor.new(
      min_threads: min_threads,
      max_threads: max_threads,
      max_queue: 1000,
      fallback_policy: :caller_runs
    )

    # Rest of initialization
  end
end

Processing Optimization

class OptimizedMessage < SmartMessage::Base
  property :data

  def self.process(message_header, message_payload)
    # Parse once, use multiple times
    data = JSON.parse(message_payload)
    message = new(data)

    # Batch operations when possible
    batch_operations(message)

    # Use connection pooling for database operations
    connection_pool.with do |conn|
      save_to_database(message, conn)
    end
  end
end

Testing Dispatcher Behavior

Dispatcher Testing

RSpec.describe SmartMessage::Dispatcher do
  let(:dispatcher) { SmartMessage::Dispatcher.new }

  before do
    dispatcher.drop_all!  # Clear subscriptions
  end

  describe "subscription management" do
    it "adds subscriptions" do
      dispatcher.add("TestMessage", "TestMessage.process")

      expect(dispatcher.subscribers["TestMessage"]).to include("TestMessage.process")
    end

    it "removes subscriptions" do
      dispatcher.add("TestMessage", "TestMessage.process")
      dispatcher.drop("TestMessage", "TestMessage.process")

      expect(dispatcher.subscribers["TestMessage"]).not_to include("TestMessage.process")
    end
  end

  describe "message routing" do
    let(:header) { double("header", message_class: "TestMessage") }
    let(:payload) { '{"data": "test"}' }

    before do
      # Mock the message class
      stub_const("TestMessage", Class.new do
        def self.process(header, payload)
          @processed_messages ||= []
          @processed_messages << [header, payload]
        end

        def self.processed_messages
          @processed_messages || []
        end
      end)
    end

    it "routes messages to subscribers" do
      dispatcher.add("TestMessage", "TestMessage.process")
      dispatcher.route(header, payload)

      # Wait for async processing
      sleep 0.1

      expect(TestMessage.processed_messages).to have(1).message
    end
  end
end

Message Processing Testing

RSpec.describe "Message Processing" do
  let(:transport) { SmartMessage::Transport.create(:memory, auto_process: true) }

  before do
    TestMessage.config do
      transport transport
      serializer SmartMessage::Serializer::Json.new
    end

    TestMessage.subscribe
  end

  it "processes published messages" do
    expect(TestMessage).to receive(:process).once

    TestMessage.new(data: "test").publish

    # Wait for async processing
    sleep 0.1
  end
end

Next Steps