Examples & Use Cases¶
This document provides practical examples of using SmartMessage in real-world scenarios.
Basic Messaging Patterns¶
Simple Notification System¶
require 'smart_message'
class NotificationMessage < SmartMessage::Base
description "Sends notifications to users via multiple channels"
property :recipient
property :subject
property :body
property :priority, default: 'normal'
property :channel, default: 'email'
config do
transport SmartMessage::Transport.create(:stdout, loopback: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
notification = decoded_message
case notification.channel
when 'email'
send_email(notification)
when 'sms'
send_sms(notification)
when 'push'
send_push_notification(notification)
end
end
private
def self.send_email(notification)
puts "📧 Sending email to #{notification.recipient}"
puts "Subject: #{notification.subject}"
puts "Priority: #{notification.priority}"
end
def self.send_sms(notification)
puts "📱 Sending SMS to #{notification.recipient}"
puts "Message: #{notification.body}"
end
def self.send_push_notification(notification)
puts "🔔 Sending push notification to #{notification.recipient}"
puts "Title: #{notification.subject}"
end
end
# Setup
NotificationMessage.subscribe
# Send notifications
NotificationMessage.new(
recipient: "user@example.com",
subject: "Welcome!",
body: "Thanks for signing up!",
priority: "high"
).publish
NotificationMessage.new(
recipient: "+1234567890",
subject: "Alert",
body: "Your order has shipped!",
channel: "sms"
).publish
Event-Driven Architecture¶
# User registration event
class UserRegisteredEvent < SmartMessage::Base
property :user_id
property :email
property :name
property :registration_source
property :timestamp, default: -> { Time.now.iso8601 }
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
event = decoded_message
# Fan out to multiple handlers
send_welcome_email(event)
create_user_profile(event)
track_analytics(event)
setup_default_preferences(event)
end
private
def self.send_welcome_email(event)
WelcomeEmailMessage.new(
user_id: event.user_id,
email: event.email,
name: event.name
).publish
end
def self.create_user_profile(event)
CreateProfileMessage.new(
user_id: event.user_id,
source: event.registration_source
).publish
end
def self.track_analytics(event)
AnalyticsMessage.new(
event_type: 'user_registration',
user_id: event.user_id,
properties: {
source: event.registration_source,
timestamp: event.timestamp
}
).publish
end
def self.setup_default_preferences(event)
PreferencesMessage.new(
user_id: event.user_id,
preferences: default_preferences
).publish
end
def self.default_preferences
{
email_notifications: true,
marketing_emails: false,
theme: 'light'
}
end
end
# Supporting message classes
class WelcomeEmailMessage < SmartMessage::Base
property :user_id
property :email
property :name
config do
transport SmartMessage::Transport.create(:stdout)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
message = decoded_message
puts "📧 Sending welcome email to #{message.email} (#{message.name})"
# Email sending logic here
end
end
class AnalyticsMessage < SmartMessage::Base
property :event_type
property :user_id
property :properties
config do
transport SmartMessage::Transport.create(:stdout)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
event = decoded_message
puts "📊 Tracking event: #{event.event_type} for user #{event.user_id}"
# Analytics tracking logic here
end
end
# Setup and trigger
[UserRegisteredEvent, WelcomeEmailMessage, AnalyticsMessage].each(&:subscribe)
# Simulate user registration
UserRegisteredEvent.new(
user_id: 12345,
email: "alice@example.com",
name: "Alice Johnson",
registration_source: "web_form"
).publish
E-commerce Order Processing¶
# Order lifecycle management
class OrderCreatedMessage < SmartMessage::Base
property :order_id
property :customer_id
property :items
property :total_amount
property :shipping_address
property :created_at, default: -> { Time.now.iso8601 }
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
order = decoded_message
# Validate order
if valid_order?(order)
# Reserve inventory
InventoryReservationMessage.new(
order_id: order.order_id,
items: order.items
).publish
# Process payment
PaymentProcessingMessage.new(
order_id: order.order_id,
customer_id: order.customer_id,
amount: order.total_amount
).publish
else
# Handle invalid order
OrderRejectedMessage.new(
order_id: order.order_id,
reason: "Invalid order data"
).publish
end
end
private
def self.valid_order?(order)
order.items&.any? && order.total_amount&.positive?
end
end
class InventoryReservationMessage < SmartMessage::Base
property :order_id
property :items
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
reservation = decoded_message
success = reserve_inventory(reservation.items)
if success
InventoryReservedMessage.new(
order_id: reservation.order_id
).publish
else
InventoryFailedMessage.new(
order_id: reservation.order_id,
reason: "Insufficient stock"
).publish
end
end
private
def self.reserve_inventory(items)
# Inventory reservation logic
puts "🏪 Reserving inventory for #{items.length} items"
true # Simulate success
end
end
class PaymentProcessingMessage < SmartMessage::Base
property :order_id
property :customer_id
property :amount
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
payment = decoded_message
success = process_payment(payment)
if success
PaymentSuccessMessage.new(
order_id: payment.order_id,
transaction_id: generate_transaction_id
).publish
else
PaymentFailedMessage.new(
order_id: payment.order_id,
reason: "Payment declined"
).publish
end
end
private
def self.process_payment(payment)
puts "💳 Processing payment of $#{payment.amount} for order #{payment.order_id}"
true # Simulate success
end
def self.generate_transaction_id
"txn_#{SecureRandom.hex(8)}"
end
end
# Setup
[
OrderCreatedMessage,
InventoryReservationMessage,
PaymentProcessingMessage
].each(&:subscribe)
# Create an order
OrderCreatedMessage.new(
order_id: "ORD-001",
customer_id: "CUST-123",
items: [
{ sku: "WIDGET-A", quantity: 2, price: 19.99 },
{ sku: "GADGET-B", quantity: 1, price: 49.99 }
],
total_amount: 89.97,
shipping_address: {
street: "123 Main St",
city: "Anytown",
state: "CA",
zip: "12345"
}
).publish
Logging and Monitoring¶
# Centralized logging system
class LogMessage < SmartMessage::Base
property :level
property :service
property :message
property :context
property :timestamp, default: -> { Time.now.iso8601 }
property :correlation_id
config do
transport SmartMessage::Transport.create(:stdout, output: "application.log")
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
log_entry = decoded_message
formatted_message = format_log_entry(log_entry)
case log_entry.level
when 'ERROR', 'FATAL'
send_alert(log_entry)
when 'WARN'
track_warning(log_entry)
end
puts formatted_message
end
private
def self.format_log_entry(log_entry)
"[#{log_entry.timestamp}] #{log_entry.level} #{log_entry.service}: #{log_entry.message}" +
(log_entry.correlation_id ? " (#{log_entry.correlation_id})" : "") +
(log_entry.context ? " | #{log_entry.context.to_json}" : "")
end
def self.send_alert(log_entry)
if log_entry.level == 'FATAL'
puts "🚨 FATAL ERROR ALERT: #{log_entry.message}"
else
puts "⚠️ ERROR ALERT: #{log_entry.message}"
end
end
def self.track_warning(log_entry)
puts "📝 Warning tracked: #{log_entry.message}"
end
end
# Application performance monitoring
class MetricMessage < SmartMessage::Base
property :metric_name
property :value
property :unit
property :tags
property :timestamp, default: -> { Time.now.to_f }
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
metric = decoded_message
# Store metric (would typically go to monitoring system)
store_metric(metric)
# Check for alerts
check_thresholds(metric)
end
private
def self.store_metric(metric)
puts "📊 Metric: #{metric.metric_name} = #{metric.value} #{metric.unit} #{metric.tags}"
end
def self.check_thresholds(metric)
case metric.metric_name
when 'response_time'
if metric.value > 1000 # More than 1 second
puts "⚠️ High response time alert: #{metric.value}ms"
end
when 'error_rate'
if metric.value > 0.05 # More than 5% error rate
puts "🚨 High error rate alert: #{(metric.value * 100).round(2)}%"
end
end
end
end
# Setup
LogMessage.subscribe
MetricMessage.subscribe
# Log some events
LogMessage.new(
level: "INFO",
service: "user-service",
message: "User login successful",
context: { user_id: 123, ip: "192.168.1.1" },
correlation_id: "req-abc123"
).publish
LogMessage.new(
level: "ERROR",
service: "payment-service",
message: "Payment gateway timeout",
context: { order_id: "ORD-001", gateway: "stripe" },
correlation_id: "req-def456"
).publish
# Send some metrics
MetricMessage.new(
metric_name: "response_time",
value: 1250,
unit: "ms",
tags: { service: "api", endpoint: "/users" }
).publish
MetricMessage.new(
metric_name: "error_rate",
value: 0.08,
unit: "percentage",
tags: { service: "payment-service" }
).publish
Gateway Pattern¶
# Bridge between different message systems
class MessageGateway < SmartMessage::Base
property :source_system
property :destination_system
property :message_type
property :payload
# Receive from one transport
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
gateway_message = decoded_message
# Transform and forward to destination system
case gateway_message.destination_system
when 'email_system'
forward_to_email_system(gateway_message)
when 'sms_system'
forward_to_sms_system(gateway_message)
when 'audit_system'
forward_to_audit_system(gateway_message)
end
end
private
def self.forward_to_email_system(gateway_message)
# Create a new message instance with different transport
email_message = EmailSystemMessage.new(
original_payload: gateway_message.payload,
source: gateway_message.source_system
)
# Override transport for this instance
email_message.config do
transport SmartMessage::Transport.create(:stdout, output: "email_system.log")
end
email_message.publish
end
def self.forward_to_sms_system(gateway_message)
sms_message = SMSSystemMessage.new(
original_payload: gateway_message.payload,
source: gateway_message.source_system
)
sms_message.config do
transport SmartMessage::Transport.create(:stdout, output: "sms_system.log")
end
sms_message.publish
end
def self.forward_to_audit_system(gateway_message)
audit_message = AuditSystemMessage.new(
event_type: gateway_message.message_type,
data: gateway_message.payload,
source_system: gateway_message.source_system,
processed_at: Time.now.iso8601
)
audit_message.config do
transport SmartMessage::Transport.create(:stdout, output: "audit_system.log")
end
audit_message.publish
end
end
# Destination system message classes
class EmailSystemMessage < SmartMessage::Base
property :original_payload
property :source
def self.process(decoded_message)
puts "📧 Email system processed message from #{decoded_message.source}"
end
end
class SMSSystemMessage < SmartMessage::Base
property :original_payload
property :source
def self.process(decoded_message)
puts "📱 SMS system processed message from #{decoded_message.source}"
end
end
class AuditSystemMessage < SmartMessage::Base
property :event_type
property :data
property :source_system
property :processed_at
def self.process(decoded_message)
puts "📋 Audit system logged event from #{decoded_message.source_system}"
end
end
# Setup
[MessageGateway, EmailSystemMessage, SMSSystemMessage, AuditSystemMessage].each(&:subscribe)
# Route messages through gateway
MessageGateway.new(
source_system: "web_app",
destination_system: "email_system",
message_type: "notification",
payload: { recipient: "user@example.com", subject: "Hello!" }
).publish
MessageGateway.new(
source_system: "mobile_app",
destination_system: "audit_system",
message_type: "user_action",
payload: { action: "login", user_id: 123 }
).publish
Error Handling and Retry Patterns¶
# Resilient message processing with retries
class ResilientMessage < SmartMessage::Base
property :data
property :retry_count, default: 0
property :max_retries, default: 3
property :original_error
config do
transport SmartMessage::Transport.create(:memory, auto_process: true)
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
message = decoded_message
begin
# Simulate potentially failing operation
if should_fail?(message)
raise StandardError, "Simulated failure"
end
puts "✅ Successfully processed message: #{message.data}"
rescue => e
handle_error(message, e)
end
end
private
def self.should_fail?(message)
# Simulate 30% failure rate
rand < 0.3
end
def self.handle_error(message, error)
puts "❌ Error processing message: #{error.message}"
if message.retry_count < message.max_retries
# Retry with exponential backoff
delay = 2 ** message.retry_count
puts "🔄 Retrying in #{delay} seconds (attempt #{message.retry_count + 1})"
# In a real system, you'd use a delayed job or similar
Thread.new do
sleep(delay)
retry_message = new(
data: message.data,
retry_count: message.retry_count + 1,
max_retries: message.max_retries,
original_error: error.message
)
retry_message.publish
end
else
# Max retries exceeded, send to dead letter queue
DeadLetterMessage.new(
original_message: message.to_h,
final_error: error.message,
retry_attempts: message.retry_count,
failed_at: Time.now.iso8601
).publish
end
end
end
class DeadLetterMessage < SmartMessage::Base
property :original_message
property :final_error
property :retry_attempts
property :failed_at
config do
transport SmartMessage::Transport.create(:stdout, output: "dead_letter_queue.log")
serializer SmartMessage::Serializer::Json.new
end
def self.process(decoded_message)
# decoded_message is already a message instance
dead_letter = decoded_message
puts "💀 Message sent to dead letter queue:"
puts " Original: #{dead_letter.original_message}"
puts " Error: #{dead_letter.final_error}"
puts " Attempts: #{dead_letter.retry_attempts}"
puts " Failed at: #{dead_letter.failed_at}"
# Could trigger alerts, save to database, etc.
end
end
# Setup
ResilientMessage.subscribe
DeadLetterMessage.subscribe
# Send messages that might fail
5.times do |i|
ResilientMessage.new(
data: "Test message #{i + 1}"
).publish
sleep(0.1) # Small delay between messages
end
Testing Helpers¶
# Test utilities for SmartMessage
module SmartMessageTestHelpers
def self.with_test_transport
original_transports = {}
# Store original transports
SmartMessage::Base.descendants.each do |klass|
original_transports[klass] = klass.transport
end
# Set up test transport
test_transport = SmartMessage::Transport.create(:memory, auto_process: true)
SmartMessage::Base.descendants.each do |klass|
klass.config do
transport test_transport
end
end
yield test_transport
ensure
# Restore original transports
original_transports.each do |klass, transport|
klass.config do
transport transport
end
end
end
def self.clear_statistics
SS.reset
end
def self.wait_for_processing(timeout: 1.0)
start_time = Time.now
while Time.now - start_time < timeout
# Check if any messages are still being processed
# This is a simplified check
sleep(0.01)
end
end
end
# Example test usage
def test_message_processing
SmartMessageTestHelpers.with_test_transport do |transport|
# Clear any existing messages
transport.clear_messages
SmartMessageTestHelpers.clear_statistics
# Set up subscriptions
TestMessage.subscribe
# Send test message
TestMessage.new(data: "test").publish
# Wait for processing
SmartMessageTestHelpers.wait_for_processing
# Check results
puts "Messages in transport: #{transport.message_count}"
puts "Published count: #{SS.get('TestMessage', 'publish')}"
puts "Processed count: #{SS.get('TestMessage', 'TestMessage.process', 'routed')}"
end
end
class TestMessage < SmartMessage::Base
property :data
def self.process(decoded_message)
# decoded_message is already a message instance
message = decoded_message
puts "Processed test message: #{message.data}"
end
end
# Run the test
test_message_processing
These examples demonstrate the flexibility and power of SmartMessage for building robust, scalable messaging systems. Each pattern can be adapted to your specific needs and combined with other patterns for more complex workflows.
Executable Example Programs¶
The examples/
directory contains complete, runnable programs that demonstrate various SmartMessage features:
Core Messaging Patterns¶
01_point_to_point_orders.rb
- Point-to-point order processing with payment integration02_publish_subscribe_events.rb
- Event broadcasting to multiple services (email, SMS, audit)03_many_to_many_chat.rb
- Interactive chat system with rooms, bots, and human agents
Advanced Features¶
04_redis_smart_home_iot.rb
- Redis-based IoT sensor monitoring with real-time data flow05_proc_handlers.rb
- Flexible message handlers (blocks, procs, lambdas, methods)06_custom_logger_example.rb
- Advanced logging with SmartMessage::Logger::Default07_error_handling_scenarios.rb
- Comprehensive validation, version mismatch, and error handling08_entity_addressing_basic.rb
- Basic FROM/TO/REPLY_TO message addressing08_entity_addressing_with_filtering.rb
- Advanced entity-aware message filtering09_dead_letter_queue_demo.rb
- Complete Dead Letter Queue system demonstration
Interactive Demos¶
tmux_chat/
- Multi-pane terminal visualization of many-to-many messaging with file-based transport
Running Examples¶
# Navigate to the SmartMessage directory
cd smart_message
# Run any example directly
ruby examples/01_point_to_point_orders.rb
ruby examples/09_dead_letter_queue_demo.rb
# For tmux chat demo
cd examples/tmux_chat && ./run_demo.sh
Each example is self-contained and includes: - Clear educational comments - Multiple message classes - Complete setup and teardown - Real-world scenarios - Best practices demonstration
Example Features Demonstrated¶
Example | Transports | Features | Use Case |
---|---|---|---|
01 | STDOUT | Point-to-point, validation | Order processing |
02 | STDOUT | Pub-sub, multiple handlers | Event broadcasting |
03 | Memory | Many-to-many, bots | Chat systems |
04 | Redis | IoT, real-time, addressing | Smart home monitoring |
05 | Memory | Proc handlers, flexibility | Dynamic message handling |
06 | STDOUT | Custom logging, lifecycle | Production logging |
07 | STDOUT | Error handling, validation | Robust message systems |
08 | STDOUT | Entity addressing, filtering | Microservice communication |
09 | Memory | DLQ, circuit breakers, replay | Production reliability |
These examples provide practical, working code that you can use as a starting point for your own SmartMessage implementations.