SmartMessage Architecture¶
SmartMessage follows a plugin-based architecture that cleanly separates message concerns from transport and serialization mechanisms.
Design Philosophy¶
SmartMessage is designed around the principle that messages should be independent of their delivery mechanism. Just as ActiveRecord abstracts database operations from business logic, SmartMessage abstracts message delivery from message content.
Core Principles¶
- Separation of Concerns: Message content, transport, and serialization are independent
- Plugin Architecture: Pluggable transports and serializers
- Dual Configuration: Both class-level and instance-level configuration
- Thread Safety: Concurrent message processing with thread pools
- Gateway Support: Messages can flow between different transports/serializers
Architecture Overview¶
graph TB
subgraph "SmartMessage Core"
Base[SmartMessage::Base]
Header[Message Header<br/>• UUID<br/>• Timestamps<br/>• Addressing]
Props[Message Properties<br/>• Business Data<br/>• Validation<br/>• Versioning]
end
subgraph "Plugin System"
Transport[Transport Plugin<br/>• publish<br/>• subscribe<br/>• Memory/Redis/STDOUT]
Serializer[Serializer Plugin<br/>• encode<br/>• decode<br/>• JSON/Custom]
Logger[Logger Plugin<br/>• Structured logging<br/>• Multiple outputs<br/>• Colorization]
end
subgraph "Message Processing"
Dispatcher[Dispatcher<br/>• Route messages<br/>• Thread pool<br/>• Subscriptions<br/>• DDQ management]
DDQ[Deduplication Queue<br/>• Handler-scoped<br/>• Memory/Redis storage<br/>• O1 performance<br/>• Circular buffer]
Handlers[Message Handlers<br/>• Default handler<br/>• Block handlers<br/>• Proc handlers<br/>• Method handlers]
end
subgraph "Reliability Layer"
CircuitBreaker[Circuit Breaker<br/>• Failure thresholds<br/>• Automatic fallback<br/>• Recovery detection]
DLQ[Dead Letter Queue<br/>• Failed messages<br/>• Replay mechanism<br/>• JSON Lines format]
end
subgraph "Monitoring"
Stats[Statistics<br/>• Message counts<br/>• Processing metrics<br/>• Thread pool status]
Filters[Message Filtering<br/>• Entity-aware routing<br/>• Regex patterns<br/>• Broadcast handling]
end
Base --> Header
Base --> Props
Base --> Transport
Base --> Serializer
Base --> Logger
Transport --> Dispatcher
Dispatcher --> DDQ
Dispatcher --> Handlers
Dispatcher --> Stats
Dispatcher --> Filters
Transport --> CircuitBreaker
CircuitBreaker --> DLQ
DDQ -.-> Stats
Handlers -.-> Stats
Core Components¶
1. SmartMessage::Base¶
The foundation class that all messages inherit from, built on Hashie::Dash
.
Key Responsibilities: - Property management and validation - Plugin configuration (transport, serializer, logger) - Message lifecycle management - Header generation and management
Location: lib/smart_message/base.rb:17-199
class MyMessage < SmartMessage::Base
description "Handles custom message processing for my application"
property :data
config do
transport MyTransport.new
serializer MySerializer.new
end
end
2. Transport Layer¶
Handles message delivery and routing between systems.
Key Responsibilities: - Message publishing and receiving - Subscription management - Connection handling - Transport-specific configuration
Location: lib/smart_message/transport/
# Transport interface
class CustomTransport < SmartMessage::Transport::Base
def do_publish(message_class, serialized_message)
# Send message via your transport
end
def subscribe(message_class, process_method, filter_options = {})
# Set up subscription via dispatcher
@dispatcher.add(message_class, process_method, filter_options)
end
end
3. Serializer System¶
Handles encoding and decoding of message content.
Key Responsibilities: - Message encoding (Ruby object → wire format) - Message decoding (wire format → Ruby object) - Format-specific handling
Location: lib/smart_message/serializer/
class CustomSerializer < SmartMessage::Serializer::Base
def do_encode(message_instance)
# Convert message instance to wire format
# Default implementation uses message_instance.to_h
end
def do_decode(payload)
# Convert from wire format back to hash
# Must return hash compatible with message initialization
end
end
4. Dispatcher¶
Routes incoming messages to appropriate handlers using concurrent processing with integrated deduplication.
Key Responsibilities:
- Message routing based on class
- Thread pool management
- Subscription catalog management
- Handler-scoped DDQ management
- Message filtering and statistics collection
Location: lib/smart_message/dispatcher.rb
dispatcher = SmartMessage::Dispatcher.new
dispatcher.add("MyMessage", "MyMessage.process")
dispatcher.route(decoded_message)
# DDQ integration is automatic when enabled
MyMessage.enable_deduplication!
5. Deduplication Queue (DDQ)¶
Handler-scoped message deduplication system preventing duplicate processing.
Key Responsibilities: - UUID-based duplicate detection - Handler isolation (each handler gets own DDQ) - Memory and Redis storage backends - O(1) performance with hybrid Array + Set data structure
Architecture:
graph LR
subgraph "Handler A DDQ"
A1[Circular Array]
A2[Lookup Set]
A3[Mutex Lock]
end
subgraph "Handler B DDQ"
B1[Circular Array]
B2[Lookup Set]
B3[Mutex Lock]
end
Message[Incoming Message<br/>UUID abc-123] --> Dispatcher
Dispatcher --> |Check Handler A| A2
Dispatcher --> |Check Handler B| B2
A2 --> |Not Found| ProcessA[Process with Handler A]
B2 --> |Found| SkipB[Skip Handler B - Duplicate]
ProcessA --> |Add UUID| A1
ProcessA --> |Add UUID| A2
Location: lib/smart_message/deduplication.rb
, lib/smart_message/ddq/
Key Features: - Handler-scoped deduplication (each handler gets its own DDQ) - UUID-based duplicate detection - Multiple storage backends (Memory, Redis) - O(1) performance with hybrid Array + Set data structure - Thread-safe operations with mutex locks
6. Message Headers¶
Standard metadata attached to every message with entity addressing support.
Key Responsibilities: - Message identification (UUID) - Routing information (message class, version) - Tracking data (timestamps, process IDs) - Entity addressing (from, to, reply_to)
Location: lib/smart_message/header.rb
header = message._sm_header
puts header.uuid # "550e8400-e29b-41d4-a716-446655440000"
puts header.message_class # "MyMessage"
puts header.published_at # 2025-08-17 10:30:00 UTC
puts header.publisher_pid # 12345
puts header.from # "payment-service"
puts header.to # "order-service"
puts header.reply_to # "payment-service" (defaults to from)
puts header.version # 1
puts header.serializer # "SmartMessage::Serializer::JSON"
Message Lifecycle¶
1. Definition Phase¶
class OrderMessage < SmartMessage::Base
property :order_id
property :amount
config do
transport SmartMessage::Transport.create(:memory)
serializer SmartMessage::Serializer::Json.new
end
end
2. Subscription Phase¶
# Basic subscription
OrderMessage.subscribe
# Subscription with filtering
OrderMessage.subscribe(from: /^payment-.*/, to: 'order-service')
OrderMessage.subscribe('PaymentService.process', broadcast: true)
# Each subscription gets its own DDQ automatically
# DDQ Key: "OrderMessage:OrderMessage.process"
# DDQ Key: "OrderMessage:PaymentService.process"
3. Publishing Phase¶
order = OrderMessage.new(order_id: "123", amount: 99.99)
order.from("order-service").to("payment-service")
order.publish
# 1. Creates header with UUID, timestamp, addressing
# 2. Encodes message via serializer
# 3. Sends via transport
# 4. Circuit breaker monitors for failures
4. Receiving Phase¶
# Transport receives serialized message
transport.receive(message_class, serialized_message)
# 1. Decodes message using class's configured serializer
# 2. Routes decoded message to dispatcher
# 3. Dispatcher checks DDQ for duplicates per handler
# 4. Applies message filters (from/to/broadcast)
# 5. Spawns thread for processing matching handlers
# 6. Marks UUID as processed in handler's DDQ after successful processing
5. Message Handler Processing¶
SmartMessage supports multiple handler types, routed through the dispatcher:
# Default handler (self.process method) - receives decoded message instance
def self.process(decoded_message)
order = decoded_message # Already a fully decoded OrderMessage instance
fulfill_order(order)
end
# Block handler (inline processing) - receives decoded message instance
OrderMessage.subscribe do |decoded_message|
quick_processing(decoded_message)
end
# Proc handler (reusable across message types) - receives decoded message instance
audit_proc = proc do |decoded_message|
AuditService.log_message(decoded_message.class.name, decoded_message)
end
OrderMessage.subscribe(audit_proc)
# Method handler (service class processing) - receives decoded message instance
class OrderService
def self.process_order(decoded_message)
complex_business_logic(decoded_message)
end
end
OrderMessage.subscribe("OrderService.process_order")
Handler Routing Process: 1. Dispatcher receives decoded message instance 2. Looks up all registered handlers for message class 3. For each handler that matches filters: - Checks DDQ for duplicates (handler-scoped) - String handlers: Resolves to class method via constantize - Proc handlers: Calls proc directly from registry 4. Executes handlers in parallel threads with circuit breaker protection 5. Marks UUID as processed in handler's DDQ after successful completion 6. Collects statistics and handles errors
Plugin System Architecture¶
Dual-Level Configuration¶
SmartMessage supports configuration at both class and instance levels:
# Class-level (default for all instances)
class PaymentMessage < SmartMessage::Base
config do
transport ProductionTransport.new
serializer SecureSerializer.new
end
end
# Instance-level (overrides class configuration)
test_payment = PaymentMessage.new(amount: 1.00)
test_payment.config do
transport TestTransport.new # Override for this instance
end
This enables sophisticated gateway patterns where messages can be: - Received from one transport (e.g., RabbitMQ) - Processed with business logic - Republished to another transport (e.g., Kafka)
Plugin Registration¶
Transports are registered in a central registry:
# Register custom transport
SmartMessage::Transport.register(:redis, RedisTransport)
# Use registered transport
MyMessage.config do
transport SmartMessage::Transport.create(:redis, url: "redis://localhost")
end
Thread Safety & Concurrency¶
Thread Pool Management¶
The dispatcher uses Concurrent::CachedThreadPool
for processing:
# Each message processing happens in its own thread with circuit breaker protection
@router_pool.post do
circuit_result = circuit(:message_processor).wrap do
if proc_handler?(message_processor)
SmartMessage::Base.call_proc_handler(message_processor, decoded_message)
else
target_class.constantize.method(class_method).call(decoded_message)
end
end
end
Thread Safety Considerations¶
- Message Instances: Each message is processed in isolation
- Shared State: Avoid shared mutable state in message classes
- Statistics: Thread-safe statistics collection via
SimpleStats
- Graceful Shutdown: Automatic cleanup on process exit
Monitoring Thread Pools¶
dispatcher = SmartMessage::Dispatcher.new
status = dispatcher.status
puts "Running: #{status[:running]}"
puts "Queue length: #{status[:queue_length]}"
puts "Completed tasks: #{status[:completed_task_count]}"
Error Handling Architecture¶
Exception Isolation¶
Processing exceptions are isolated to prevent cascade failures:
circuit_result = circuit(:message_processor).wrap do
# Handler execution with circuit breaker protection
end
# Handle circuit breaker fallback responses
if circuit_result.is_a?(Hash) && circuit_result[:circuit_breaker]
handle_circuit_breaker_fallback(circuit_result, decoded_message, message_processor)
end
Custom Error Types¶
SmartMessage defines specific error types for different failure modes:
module SmartMessage::Errors
class TransportNotConfigured < RuntimeError; end
class SerializerNotConfigured < RuntimeError; end
class NotImplemented < RuntimeError; end
class ReceivedMessageNotSubscribed < RuntimeError; end
class UnknownMessageClass < RuntimeError; end
class ValidationError < RuntimeError; end
end
Reliability & Fault Tolerance¶
Circuit Breaker Integration¶
SmartMessage integrates BreakerMachines for production-grade reliability:
# Circuit breakers are automatically configured for all transports
class MyTransport < SmartMessage::Transport::Base
# Inherits circuit breaker configuration:
# - :transport_publish for publishing operations
# - :transport_subscribe for subscription operations
# - Automatic DLQ fallback for failed publishes
end
Circuit Breaker States: - Closed: Normal operation, requests pass through - Open: Threshold exceeded, requests fail fast - Half-Open: Testing if service recovered
Dead Letter Queue¶
Failed messages are automatically captured in the Dead Letter Queue:
# Automatic capture when circuit breaker trips
message.publish # If transport fails, goes to DLQ
# Manual capture for business logic failures
dlq = SmartMessage::DeadLetterQueue.default
dlq.enqueue(decoded_message, error: "Validation failed", transport: "manual")
DLQ Architecture:
graph TB
Publish[Message Publishing]
CB[Circuit Breaker<br/>Monitoring]
Transport[Transport<br/>Success]
DLQ[Dead Letter Queue<br/>Failure Storage]
Replay[Replay Mechanism<br/>Manual/Automated]
Publish --> CB
CB --> |Success| Transport
CB --> |Failure| DLQ
DLQ --> Replay
Replay --> |Retry| Publish
DLQ Features: - JSON Lines format for efficient append operations - FIFO queue operations with thread safety - Replay capabilities with transport override - Administrative tools for filtering and analysis
Statistics & Monitoring¶
Built-in Statistics¶
SmartMessage automatically collects processing statistics including DDQ metrics:
# Statistics are collected for:
SS.add(message_class, 'publish')
SS.add(message_class, process_method, 'routed')
# Access statistics
puts SS.stat
puts SS.get("MyMessage", "publish")
# DDQ-specific statistics
stats = OrderMessage.ddq_stats
puts "DDQ utilization: #{stats[:utilization]}%"
puts "Current count: #{stats[:current_count]}"
Monitoring Points¶
- Message Publishing: Count of published messages per class
- Message Routing: Count of routed messages per processor
- Thread Pool: Queue length, completed tasks, running status
- Transport Status: Connection status, message counts
- DDQ Metrics: Utilization, duplicate detection rates, memory usage
- Message Filtering: Filter match rates, entity-aware routing statistics
Configuration Architecture¶
Configuration Hierarchy¶
- Class-level defaults: Set via
MyMessage.config
- Instance-level overrides: Set via
message.config
- Runtime configuration: Dynamic plugin switching
Configuration Objects¶
Configuration uses method-based DSL:
config do
transport MyTransport.new(option1: value1)
serializer MySerializer.new(option2: value2)
logger MyLogger.new(level: :debug)
end
Plugin Resolution¶
When a message needs a plugin:
- Check instance-level configuration
- Fall back to class-level configuration
- Raise error if not configured
def transport
@transport || self.class.class_variable_get(:@@transport) || raise(Errors::TransportNotConfigured)
rescue NameError
raise(Errors::TransportNotConfigured)
end
This architecture provides flexibility while maintaining clear fallback behavior.