Skip to content

Background Processing

Ragdoll implements a comprehensive background processing system using ActiveJob to handle computationally intensive operations without blocking the main application thread. This system is essential for production deployments where document processing, embedding generation, and content analysis must scale efficiently.

Overview

Schema Note: Following recent schema optimization, the embedding_model field is now stored in content-specific tables (text_contents, image_contents, audio_contents) rather than in individual embeddings. This eliminates field duplication while maintaining full functionality through polymorphic relationships.

The background processing architecture consists of four specialized job types that handle different aspects of document intelligence:

  • Ragdoll::GenerateEmbeddingsJob: Vector embedding creation for search
  • Ragdoll::ExtractTextJob: Content extraction from various file formats
  • Ragdoll::ExtractKeywordsJob: AI-powered keyword analysis
  • Ragdoll::GenerateSummaryJob: Document summarization

All jobs are designed to work together in a coordinated pipeline while maintaining individual reliability and error handling.

Architecture

Job Inheritance Structure

# Base job class with shared functionality
class ApplicationJob < ActiveJob::Base
  include Ragdoll::Core::Jobs::SharedMethods

  retry_on StandardError, wait: :exponentially_longer, attempts: 3
  discard_on ActiveJob::DeserializationError

  around_perform :with_job_logging
  around_perform :with_error_handling
end

# Specialized job implementations
class Ragdoll::GenerateEmbeddingsJob < ApplicationJob
class Ragdoll::ExtractTextJob < ApplicationJob  
class Ragdoll::ExtractKeywordsJob < ApplicationJob
class Ragdoll::GenerateSummaryJob < ApplicationJob

Queue Configuration

# Queue priorities and routing
class Ragdoll::GenerateEmbeddingsJob < ApplicationJob
  queue_as :embeddings
  queue_with_priority 10  # High priority for search functionality
end

class Ragdoll::ExtractTextJob < ApplicationJob
  queue_as :processing
  queue_with_priority 5   # Medium priority for content extraction
end

class Ragdoll::GenerateSummaryJob < ApplicationJob
  queue_as :analysis
  queue_with_priority 1   # Lower priority for enhancement features
end

Job Types

1. GenerateEmbeddingsJob

Purpose: Creates vector embeddings for semantic search functionality.

Implementation:

class Ragdoll::GenerateEmbeddingsJob < ApplicationJob
  def perform(embeddable_id, embeddable_type, options = {})
    embeddable = embeddable_type.constantize.find(embeddable_id)

    # Extract content based on type
    content = extract_content_for_embedding(embeddable)

    # Generate vector embedding
    vector = EmbeddingService.generate_embedding(
      content,
      model: options[:model] || Configuration.embedding_model
    )

    # Store embedding with metadata (embedding_model accessed via polymorphic relationship)
    embeddable.embeddings.create!(
      embedding_vector: vector,
      content: content,
      chunk_index: options[:chunk_index],
      metadata: build_embedding_metadata(embeddable, options)
    )

    # Update document processing status
    update_document_status(embeddable.document)

  rescue EmbeddingService::Error => e
    handle_embedding_error(e, embeddable)
    raise
  end

  private

  def extract_content_for_embedding(embeddable)
    case embeddable
    when TextContent
      embeddable.content
    when ImageContent
      embeddable.description || embeddable.alt_text
    when AudioContent
      embeddable.transcript
    else
      raise ArgumentError, "Unsupported embeddable type: #{embeddable.class}"
    end
  end
end

Features: - ✅ Multi-modal content support (text, image descriptions, audio transcripts) - ✅ Configurable embedding models per job - ✅ Chunk-aware processing for large documents - ✅ Automatic retry with exponential backoff - ✅ Error handling with fallback strategies - ✅ Progress tracking and status updates

Usage:

# Queue embedding generation for text content
text_content = TextContent.find(123)
Ragdoll::GenerateEmbeddingsJob.perform_later(text_content.id, 'TextContent')

# Batch processing with options
TextContent.where(embeddings_count: 0).find_each do |content|
  Ragdoll::GenerateEmbeddingsJob.perform_later(
    content.id, 
    'TextContent',
    model: 'text-embedding-3-large',
    chunk_index: content.calculate_chunk_index
  )
end

2. ExtractTextJob

Purpose: Extracts text content from various file formats and prepares it for processing.

Implementation:

class Ragdoll::ExtractTextJob < ApplicationJob
  def perform(document_id, options = {})
    document = Document.find(document_id)

    # Extract text based on document type
    extracted_text = case document.document_type
    when 'pdf'
      extract_from_pdf(document)
    when 'docx'
      extract_from_docx(document)
    when 'audio'
      extract_from_audio(document)  # Speech-to-text
    when 'image'
      extract_from_image(document)  # OCR
    else
      extract_from_file(document)
    end

    # Create text content with chunking and embedding model
    text_content = document.text_contents.create!(
      content: extracted_text,
      language_detected: detect_language(extracted_text),
      embedding_model: options[:embedding_model] || Configuration.embedding_model,
      chunk_size: options[:chunk_size] || Configuration.chunk_size,
      chunk_overlap: options[:chunk_overlap] || Configuration.chunk_overlap,
      extraction_method: determine_extraction_method(document),
      extracted_at: Time.current
    )

    # Queue embedding generation
    Ragdoll::GenerateEmbeddingsJob.perform_later(text_content.id, 'TextContent')

    # Queue additional analysis
    if Configuration.enable_keyword_extraction?
      Ragdoll::ExtractKeywordsJob.perform_later(text_content.id)
    end

    if Configuration.enable_document_summarization?
      Ragdoll::GenerateSummaryJob.perform_later(text_content.id)
    end

    # Update document status
    document.update!(status: 'processed') if document.processing_complete?

  rescue DocumentProcessor::ExtractionError => e
    handle_extraction_error(e, document)
    raise
  end
end

Features: - ✅ Multi-format support (PDF, DOCX, audio, images) - ✅ Language detection and encoding handling - ✅ Intelligent text chunking - ✅ OCR integration for image-based text - ✅ Speech-to-text for audio content - ✅ Automatic downstream job queuing - ✅ Progress tracking and status management

3. ExtractKeywordsJob

Purpose: Performs AI-powered keyword and topic extraction from text content.

Implementation:

class Ragdoll::ExtractKeywordsJob < ApplicationJob
  def perform(text_content_id, options = {})
    text_content = TextContent.find(text_content_id)

    # Generate keywords using LLM
    keywords_result = TextGenerationService.extract_keywords(
      text_content.content,
      model: options[:model] || Configuration.keywords_model,
      max_keywords: options[:max_keywords] || 10,
      include_topics: options[:include_topics] || true
    )

    # Update document metadata
    document = text_content.document
    current_metadata = document.metadata || {}

    document.update!(
      metadata: current_metadata.merge(
        keywords: keywords_result[:keywords],
        topics: keywords_result[:topics],
        keyword_extraction: {
          model: keywords_result[:model],
          extracted_at: Time.current,
          confidence_scores: keywords_result[:confidence_scores]
        }
      )
    )

    # Trigger re-indexing for search
    if Configuration.enable_search_indexing?
      reindex_document_for_search(document)
    end

  rescue TextGenerationService::Error => e
    handle_keyword_extraction_error(e, text_content)
    raise
  end
end

Features: - ✅ LLM-powered keyword extraction - ✅ Topic modeling and categorization - ✅ Confidence scoring for extracted terms - ✅ Metadata integration with document search - ✅ Configurable extraction parameters - ✅ Search index updates

4. GenerateSummaryJob

Purpose: Creates AI-generated summaries of document content.

Implementation:

class Ragdoll::GenerateSummaryJob < ApplicationJob
  def perform(text_content_id, options = {})
    text_content = TextContent.find(text_content_id)

    # Generate summary using LLM
    summary_result = TextGenerationService.generate_summary(
      text_content.content,
      model: options[:model] || Configuration.summary_model,
      max_length: options[:max_length] || 200,
      style: options[:style] || 'academic'
    )

    # Update document metadata
    document = text_content.document
    current_metadata = document.metadata || {}

    document.update!(
      metadata: current_metadata.merge(
        summary: summary_result[:summary],
        summary_metadata: {
          model: summary_result[:model],
          length: summary_result[:summary].length,
          generated_at: Time.current,
          style: options[:style],
          reading_time_minutes: estimate_reading_time(summary_result[:summary])
        }
      )
    )

    # Create searchable summary embedding
    if Configuration.enable_summary_embeddings?
      Ragdoll::GenerateEmbeddingsJob.perform_later(
        document.id,
        'Document', 
        content_type: 'summary',
        content: summary_result[:summary]
      )
    end

  rescue TextGenerationService::Error => e
    handle_summary_generation_error(e, text_content)
    raise
  end
end

Features: - ✅ AI-powered summarization with style options - ✅ Configurable summary length and format - ✅ Reading time estimation - ✅ Summary embedding generation for search - ✅ Metadata enrichment - ✅ Multiple summarization models

Job Orchestration

Processing Pipeline

# Complete document processing pipeline
class DocumentProcessingOrchestrator
  def self.process_document(document)
    # 1. Extract text content
    Ragdoll::ExtractTextJob.perform_later(document.id)

    # 2. Handle multi-modal content
    if document.has_images?
      document.image_contents.each do |image|
        GenerateImageDescriptionJob.perform_later(image.id)
      end
    end

    if document.has_audio?
      document.audio_contents.each do |audio|
        Ragdoll::ExtractTextJob.perform_later(audio.id, content_type: 'audio')
      end
    end

    # 3. Generate embeddings (triggered by content creation)
    # 4. Extract keywords (triggered by text content creation)
    # 5. Generate summary (triggered by text content creation)
  end
end

Batch Processing

# Efficient batch processing for large document collections
class BatchProcessor
  def self.process_document_batch(document_ids)
    # Group by document type for optimized processing
    documents = Document.where(id: document_ids).includes(:text_contents)

    documents.group_by(&:document_type).each do |type, docs|
      case type
      when 'pdf'
        queue_pdf_batch(docs)
      when 'audio'
        queue_audio_batch(docs)
      when 'image'
        queue_image_batch(docs)
      end
    end
  end

  private

  def self.queue_pdf_batch(documents)
    # Batch PDF processing with staggered job scheduling
    documents.each_with_index do |doc, index|
      ExtractTextJob.set(wait: index * 2.seconds).perform_later(doc.id)
    end
  end
end

Error Handling

Retry Strategies

class ApplicationJob < ActiveJob::Base
  # Exponential backoff for transient errors
  retry_on EmbeddingService::RateLimitError, wait: :exponentially_longer, attempts: 5
  retry_on Net::TimeoutError, wait: 10.seconds, attempts: 3

  # Discard jobs that can't be retried
  discard_on ActiveRecord::RecordNotFound
  discard_on DocumentProcessor::UnsupportedFormatError

  # Custom retry logic for specific errors
  retry_on TextGenerationService::ModelUnavailableError do |job, error|
    # Try alternative model
    job.arguments[1] = { model: 'fallback-model' }
    job.retry_job(wait: 30.seconds)
  end
end

Error Monitoring

class ApplicationJob < ActiveJob::Base
  around_perform :with_error_tracking

  private

  def with_error_tracking
    start_time = Time.current
    yield
    track_job_success(Time.current - start_time)
  rescue => error
    track_job_failure(error, Time.current - start_time)

    # Update document status on critical failures
    if respond_to?(:document) && document.present?
      document.update!(
        status: 'error',
        error_message: error.message,
        error_details: {
          job_class: self.class.name,
          error_class: error.class.name,
          backtrace: error.backtrace.first(10),
          occurred_at: Time.current
        }
      )
    end

    raise
  end
end

Monitoring and Analytics

Job Status Tracking

# Track job progress for user feedback
class JobStatusTracker
  def self.track_document_processing(document)
    {
      document_id: document.id,
      status: document.status,
      jobs_queued: count_queued_jobs(document),
      jobs_completed: count_completed_jobs(document),
      estimated_completion: estimate_completion_time(document),
      processing_steps: {
        text_extraction: job_status('Ragdoll::ExtractTextJob', document),
        embedding_generation: job_status('Ragdoll::GenerateEmbeddingsJob', document),
        keyword_extraction: job_status('Ragdoll::ExtractKeywordsJob', document),
        summary_generation: job_status('Ragdoll::GenerateSummaryJob', document)
      }
    }
  end
end

# Usage in API
GET /api/documents/123/status
{
  "document_id": 123,
  "status": "processing",
  "progress": 65,
  "jobs_queued": 2,
  "jobs_completed": 6,
  "estimated_completion": "2024-01-15T10:30:00Z"
}

Performance Metrics

# Background job performance monitoring
class JobMetrics
  def self.embedding_generation_stats
    {
      average_duration: calculate_average_duration('Ragdoll::GenerateEmbeddingsJob'),
      success_rate: calculate_success_rate('Ragdoll::GenerateEmbeddingsJob'),
      throughput_per_hour: calculate_throughput('Ragdoll::GenerateEmbeddingsJob'),
      queue_depth: queue_depth('embeddings'),
      error_types: error_breakdown('Ragdoll::GenerateEmbeddingsJob')
    }
  end
end

Configuration

Queue Adapter Setup

# Production: Sidekiq configuration
# config/application.rb
config.active_job.queue_adapter = :sidekiq

# Queue priorities and routing
# config/schedule.yml
:queues:
  - [embeddings, 3]     # High priority, 3 workers
  - [processing, 2]     # Medium priority, 2 workers  
  - [analysis, 1]       # Low priority, 1 worker

# Development: Async configuration
# config/environments/development.rb
config.active_job.queue_adapter = :async

# Test: Inline configuration
# config/environments/test.rb
config.active_job.queue_adapter = :test

Ragdoll Configuration

Ragdoll::Core.configure do |config|
  # Background processing settings
  config.enable_background_processing = true
  config.job_queue_prefix = 'ragdoll'

  # Job-specific settings
  config.enable_keyword_extraction = true
  config.enable_document_summarization = true
  config.enable_summary_embeddings = true

  # Performance tuning
  config.batch_size = 100
  config.job_timeout = 300.seconds
  config.max_retry_attempts = 3

  # Model configuration for jobs
  config.embedding_model = 'text-embedding-3-small'
  config.summary_model = 'gpt-4'
  config.keywords_model = 'gpt-3.5-turbo'
end

Production Deployment

Scaling Strategy

# Horizontal scaling with multiple job types
# docker-compose.yml
services:
  ragdoll-embeddings:
    image: ragdoll-app
    command: bundle exec sidekiq -q embeddings -c 3

  ragdoll-processing:
    image: ragdoll-app  
    command: bundle exec sidekiq -q processing -c 2

  ragdoll-analysis:
    image: ragdoll-app
    command: bundle exec sidekiq -q analysis -c 1

Health Checks

# Job queue health monitoring
class JobHealthCheck
  def self.status
    {
      queues: queue_status,
      workers: worker_status,
      failed_jobs: failed_job_count,
      processing_lag: calculate_processing_lag,
      alerts: generate_alerts
    }
  end

  def self.queue_status
    %w[embeddings processing analysis].map do |queue|
      {
        name: queue,
        size: Sidekiq::Queue.new(queue).size,
        latency: Sidekiq::Queue.new(queue).latency,
        busy: Sidekiq::Workers.new.select { |_, _, work| work['queue'] == queue }.size
      }
    end
  end
end

Best Practices

1. Job Design

  • Keep jobs focused on single responsibilities
  • Use appropriate queue priorities
  • Implement comprehensive error handling
  • Design for idempotency when possible

2. Performance Optimization

  • Batch similar operations when feasible
  • Use appropriate retry strategies
  • Monitor queue depths and processing times
  • Scale workers based on job types and volumes

3. Error Management

  • Implement dead letter queues for failed jobs
  • Log comprehensive error context
  • Provide user feedback on job status
  • Plan for graceful degradation

4. Monitoring

  • Track job performance metrics
  • Monitor queue health and worker utilization
  • Set up alerts for processing delays
  • Analyze failure patterns for improvements

The background processing system in Ragdoll provides a robust foundation for scalable document intelligence operations, ensuring that computationally intensive tasks don't impact user experience while maintaining reliability and observability in production environments.