Skip to content

Multi-Transport Publishing

Send messages to multiple transports simultaneously for redundancy, integration, and migration scenarios.

SmartMessage supports configuring messages with multiple transports, enabling sophisticated messaging patterns where a single publish() operation can deliver messages across different transport systems simultaneously.

Overview

Multi-transport publishing allows you to:

  • Redundancy: Send critical messages to primary and backup systems
  • Integration: Simultaneously deliver to production queues and logging/monitoring systems
  • Migration: Gradually transition between transport systems without downtime
  • Fan-out: Broadcast messages to multiple processing pipelines
  • Resilience: Ensure message delivery succeeds as long as ANY transport is available

Basic Configuration

Configure multiple transports by passing an array to the transport method:

class OrderProcessingMessage < SmartMessage::Base
  property :order_id, required: true
  property :customer_id, required: true
  property :amount, required: true

  # Configure multiple transports
  transport [
    SmartMessage::Transport.create(:redis_queue, url: 'redis://primary:6379'),
    SmartMessage::Transport.create(:redis, url: 'redis://backup:6379'),
    SmartMessage::Transport::StdoutTransport.new(format: :json)
  ]
end

# Publishing sends to ALL configured transports
message = OrderProcessingMessage.new(
  order_id: "ORD-12345", 
  customer_id: "CUST-789", 
  amount: 149.99
)

message.publish  # ✅ Publishes to Redis Queue, Redis Pub/Sub, and STDOUT

Transport Introspection

SmartMessage provides utility methods to inspect and manage transport configurations:

# Check transport configuration
puts message.multiple_transports?  # => true
puts message.single_transport?     # => false
puts message.transports.length     # => 3

# Access individual transports
message.transports.each_with_index do |transport, index|
  puts "Transport #{index}: #{transport.class.name}"
end

# Get primary transport (first in array) for backward compatibility
primary = message.transport  # Returns first transport

Instance-Level Overrides

You can override class-level multi-transport configuration at the instance level:

class MonitoringMessage < SmartMessage::Base
  property :metric, required: true

  # Class-level: send to monitoring and backup
  transport [
    SmartMessage::Transport.create(:redis, url: 'redis://monitoring:6379'),
    SmartMessage::Transport.create(:redis, url: 'redis://backup:6379')
  ]
end

# Instance-level override for testing
test_message = MonitoringMessage.new(metric: "cpu_usage: 85%")
test_message.transport(SmartMessage::Transport::StdoutTransport.new)

puts test_message.single_transport?  # => true (overridden)
test_message.publish  # Only goes to STDOUT

Error Handling and Resilience

Multi-transport publishing is designed to be resilient:

Partial Failures

When some transports succeed and others fail, publishing continues:

class CriticalAlert < SmartMessage::Base
  property :alert_text, required: true

  transport [
    ReliableTransport.new,      # ✅ Succeeds
    FailingTransport.new,       # ❌ Fails
    BackupTransport.new         # ✅ Succeeds  
  ]
end

alert = CriticalAlert.new(alert_text: "Database connection lost")
alert.publish  # ✅ Succeeds! 2 out of 3 transports work

# Logs will show:
# [INFO] Published: CriticalAlert via ReliableTransport, BackupTransport  
# [WARN] Failed transports for CriticalAlert: FailingTransport

Complete Failures

Only when ALL transports fail does publishing raise an error:

class AllFailingMessage < SmartMessage::Base
  property :data

  transport [
    FailingTransport.new,       # ❌ Fails
    AnotherFailingTransport.new # ❌ Fails
  ]
end

message = AllFailingMessage.new(data: "test")

begin
  message.publish
rescue SmartMessage::Errors::PublishError => e
  puts e.message  # "All transports failed: FailingTransport: connection error; AnotherFailingTransport: timeout"
end

Error Logging

Multi-transport publishing provides comprehensive error logging:

# Example log output during partial failure:
[DEBUG] About to call transport.publish on RedisTransport
[DEBUG] transport.publish completed on RedisTransport
[ERROR] Transport FailingTransport failed: StandardError - Connection timeout
[DEBUG] About to call transport.publish on StdoutTransport  
[DEBUG] transport.publish completed on StdoutTransport
[INFO]  Published: MyMessage via RedisTransport, StdoutTransport
[WARN]  Failed transports for MyMessage: FailingTransport

Common Use Cases

1. High-Availability Critical Messages

Ensure critical business messages reach their destination even if primary systems fail:

class PaymentProcessedMessage < SmartMessage::Base
  property :payment_id, required: true
  property :amount, required: true
  property :status, required: true

  # Primary processing + backup + audit trail
  transport [
    SmartMessage::Transport.create(:redis_queue, 
      url: 'redis://primary-cluster:6379',
      queue_prefix: 'payments'
    ),
    SmartMessage::Transport.create(:redis,
      url: 'redis://backup-cluster:6380'  
    ),
    SmartMessage::Transport::StdoutTransport.new(
      output: '/var/log/payments.log',
      format: :json
    )
  ]
end

2. Development and Production Dual Publishing

Send messages to both production and development environments during migration:

class UserRegistrationMessage < SmartMessage::Base  
  property :user_id, required: true
  property :email, required: true

  # Dual publishing during migration
  transport [
    SmartMessage::Transport.create(:redis, 
      url: ENV['PRODUCTION_REDIS_URL']
    ),
    SmartMessage::Transport.create(:redis_queue,
      url: ENV['NEW_SYSTEM_REDIS_URL'],
      queue_prefix: 'migration'
    )
  ]
end

3. Monitoring and Alerting Integration

Combine business processing with operational monitoring:

class OrderFailureMessage < SmartMessage::Base
  property :order_id, required: true  
  property :error_message, required: true
  property :customer_impact, required: true

  transport [
    # Business processing
    SmartMessage::Transport.create(:redis_queue,
      url: 'redis://orders:6379'
    ),

    # Operations monitoring  
    SmartMessage::Transport.create(:webhook,
      url: 'https://monitoring.company.com/alerts'
    ),

    # Development debugging
    SmartMessage::Transport::StdoutTransport.new(format: :pretty)
  ]
end

4. A/B Testing and Feature Rollouts

Send messages to old and new systems during feature rollouts:

class AnalyticsEventMessage < SmartMessage::Base
  property :event_type, required: true
  property :user_id, required: true
  property :metadata, default: {}

  transport [
    # Existing analytics pipeline (stable)
    SmartMessage::Transport.create(:redis, 
      url: 'redis://analytics-v1:6379'
    ),

    # New analytics pipeline (testing)  
    SmartMessage::Transport.create(:redis_queue,
      url: 'redis://analytics-v2:6379',
      queue_prefix: 'beta'
    )
  ]
end

Performance Considerations

Sequential Processing

Transports are processed sequentially in the order configured:

# Order matters for performance
transport [
  FastMemoryTransport.new,      # Processed first (fast)
  SlowNetworkTransport.new,     # Processed second (slow) 
  AnotherFastTransport.new      # Processed third (waits for slow)
]

Recommendation: Place fastest/most critical transports first.

Transport Independence

Each transport failure is isolated and doesn't affect others:

transport [
  ReliableTransport.new,        # Always succeeds
  UnreliableTransport.new,      # May fail, doesn't affect others
  BackupTransport.new           # Provides redundancy
]

Memory Usage

Each transport instance maintains its own connection and state:

# Each transport creates its own connection pool
transport [
  SmartMessage::Transport.create(:redis, url: 'redis://server1:6379'),  
  SmartMessage::Transport.create(:redis, url: 'redis://server2:6379'),
  SmartMessage::Transport.create(:redis, url: 'redis://server3:6379')   
]
# Total: 3 Redis connection pools

Best Practices

1. Limit Transport Count

Don't configure excessive transports as this impacts performance:

# ✅ Good: 2-4 transports for specific purposes
transport [
  PrimaryTransport.new,
  BackupTransport.new, 
  MonitoringTransport.new
]

# ❌ Avoid: Too many transports
transport [
  Transport1.new, Transport2.new, Transport3.new,
  Transport4.new, Transport5.new, Transport6.new  # Overkill
]

2. Group by Purpose

Organize transports by their intended purpose:

class BusinessMessage < SmartMessage::Base
  transport [
    # Core business processing
    SmartMessage::Transport.create(:redis_queue, url: primary_redis_url),

    # Operational monitoring  
    SmartMessage::Transport::StdoutTransport.new(
      output: '/var/log/business-events.log'
    ),

    # Disaster recovery backup
    SmartMessage::Transport.create(:redis, url: backup_redis_url)
  ]
end

3. Environment-Specific Configuration

Use environment variables for transport configuration:

class ConfigurableMessage < SmartMessage::Base
  transport_configs = []

  # Always include primary transport
  transport_configs << SmartMessage::Transport.create(:redis_queue,
    url: ENV['PRIMARY_REDIS_URL']
  )

  # Add backup transport in production
  if Rails.env.production?
    transport_configs << SmartMessage::Transport.create(:redis,
      url: ENV['BACKUP_REDIS_URL'] 
    )
  end

  # Add stdout transport in development
  if Rails.env.development? 
    transport_configs << SmartMessage::Transport::StdoutTransport.new
  end

  transport transport_configs
end

4. Health Monitoring

Monitor the health of your multi-transport setup:

class HealthCheckMessage < SmartMessage::Base
  property :timestamp, default: -> { Time.now }

  transport [
    PrimaryTransport.new,
    BackupTransport.new
  ]

  # Class method to check transport health
  def self.health_check
    test_message = new(timestamp: Time.now)

    begin
      test_message.publish
      { status: 'healthy', transports: 'all_operational' }
    rescue SmartMessage::Errors::PublishError => e
      { status: 'degraded', error: e.message }
    end
  end
end

Migration Strategies

Gradual Migration

When migrating from one transport to another:

class MigrationMessage < SmartMessage::Base

  # Phase 1: Dual publishing
  transport [
    OldTransport.new,      # Keep existing system running
    NewTransport.new       # Start sending to new system
  ]

  # Phase 2: Monitor and validate new system
  # Phase 3: Remove old transport when confident
end

Blue-Green Deployment

Support blue-green deployments with transport switching:

class DeploymentMessage < SmartMessage::Base
  def self.configure_for_deployment(color)
    case color
    when :blue
      transport BlueEnvironmentTransport.new
    when :green  
      transport GreenEnvironmentTransport.new
    when :both
      transport [
        BlueEnvironmentTransport.new,
        GreenEnvironmentTransport.new
      ]
    end
  end
end

Troubleshooting

Common Issues

Issue: Publishing seems slow

# Check transport order - slow transports block subsequent ones
transport [
  SlowTransport.new,     # ❌ Blocks others
  FastTransport.new      # Must wait for slow one
]

# Solution: Reorder with fastest first
transport [
  FastTransport.new,     # ✅ Completes quickly  
  SlowTransport.new      # Others don't wait
]

Issue: Partial failures not logged

# Ensure proper logging configuration
SmartMessage.configure do |config|
  config.logger.level = :debug  # Show all transport operations
end

Issue: All transports failing unexpectedly

# Test each transport individually
message.transports.each_with_index do |transport, index|
  begin
    transport.publish(message)
    puts "Transport #{index} (#{transport.class.name}): ✅ Success"
  rescue => e
    puts "Transport #{index} (#{transport.class.name}): ❌ Failed - #{e.message}"
  end
end

See Also