Skip to content

Jobs Reference

Ragdoll uses ActiveJob for background processing of document analysis, content extraction, and embedding generation. The job system provides reliable, asynchronous processing with comprehensive error handling and monitoring capabilities.

Background Job System

The background job system orchestrates multi-step document processing workflows, ensuring efficient handling of large documents and compute-intensive AI operations. All jobs inherit from ActiveJob::Base and integrate seamlessly with Rails job queue adapters including Sidekiq, Resque, and Delayed Job.

Key Features: - Asynchronous document processing pipeline - Automatic job chaining and dependency management - Comprehensive error handling with status tracking - Built-in retry mechanisms with exponential backoff - Integration with document status lifecycle management

Core Jobs

Ragdoll includes four primary job classes that handle the complete document processing pipeline:

ExtractText

The ExtractText job initiates the document processing pipeline by extracting text content from uploaded files.

module Ragdoll
  module Core
    module Jobs
      class ExtractText < ActiveJob::Base
        queue_as :default

        def perform(document_id)
          document = Ragdoll::Document.find(document_id)
          return unless document.file_attached?
          return if document.content.present?

          document.update!(status: "processing")

          extracted_content = document.extract_text_from_file

          if extracted_content.present?
            document.update!(
              content: extracted_content,
              status: "processed"
            )

            # Queue follow-up jobs
            Ragdoll::GenerateSummaryJob.perform_later(document_id)
            GenerateKeywordsJob.perform_later(document_id)
            Ragdoll::GenerateEmbeddingsJob.perform_later(document_id)
          else
            document.update!(status: "error")
          end
        rescue ActiveRecord::RecordNotFound
          # Document was deleted, nothing to do
        rescue StandardError => e
          document&.update!(status: "error")
          raise e
        end
      end
    end
  end
end

Key Responsibilities: - File Processing: Extracts text from PDF, DOCX, HTML, and other supported formats - Status Management: Updates document status through processing lifecycle - Job Orchestration: Triggers downstream jobs upon successful completion - Error Recovery: Handles file access errors and processing failures

Usage Examples:

# Queue text extraction for a document
Ragdoll::ExtractTextJob.perform_later(document.id)

# Process immediately (synchronous)
Ragdoll::ExtractTextJob.perform_now(document.id)

# Schedule for later processing
Ragdoll::ExtractTextJob.set(wait: 1.hour).perform_later(document.id)

GenerateEmbeddings

The GenerateEmbeddings job creates vector embeddings for all content associated with a document.

module Ragdoll
  module Core
    module Jobs
      class GenerateEmbeddings < ActiveJob::Base
        queue_as :default

        def perform(document_id, chunk_size: nil, chunk_overlap: nil)
          document = Ragdoll::Document.find(document_id)
          return unless document.content.present?
          return if document.all_embeddings.exists?

          # Process all content records using their own generate_embeddings! methods
          document.contents.each(&:generate_embeddings!)

          # Update document status to processed
          document.update!(status: "processed")
        rescue ActiveRecord::RecordNotFound
          # Document was deleted, nothing to do
        rescue StandardError => e
          if defined?(Rails)
            Rails.logger.error "Failed to generate embeddings for document #{document_id}: #{e.message}"
          end
          raise e
        end
      end
    end
  end
end

Key Features: - Multi-Modal Support: Handles text, image, and audio content embeddings - Chunking Integration: Respects content-specific chunking strategies
- Batch Processing: Processes multiple content chunks efficiently - Provider Abstraction: Works with OpenAI, Hugging Face, and other embedding providers

Configuration Options:

# Custom chunk sizing
Ragdoll::GenerateEmbeddingsJob.perform_later(
  document.id,
  chunk_size: 1500,
  chunk_overlap: 300
)

# Monitor embedding generation
document.contents.each do |content|
  puts "Embeddings for #{content.type}: #{content.embeddings.count}"
end

ExtractKeywords

The ExtractKeywords job uses LLM-powered analysis to extract relevant keywords from document content.

module Ragdoll
  module Core
    module Jobs
      class ExtractKeywords < ActiveJob::Base
        queue_as :default

        def perform(document_id)
          document = Ragdoll::Document.find(document_id)
          return unless document.content.present?
          return if document.keywords.present?

          text_service = TextGenerationService.new
          keywords_array = text_service.extract_keywords(document.content)

          if keywords_array.present?
            keywords_string = keywords_array.join(", ")
            document.update!(keywords: keywords_string)
          end
        rescue ActiveRecord::RecordNotFound
          # Document was deleted, nothing to do
        rescue StandardError => e
          Rails.logger.error "Failed to generate keywords for document #{document_id}: #{e.message}" if defined?(Rails)
          raise e
        end
      end
    end
  end
end

LLM Integration: - Uses configured LLM provider (OpenAI, Anthropic, etc.) - Applies intelligent keyword extraction prompts - Validates and formats keyword output - Stores keywords as searchable metadata

Quality Controls:

# Keywords are stored as comma-separated strings
document.keywords # => "machine learning, artificial intelligence, neural networks"

# Access as array for programmatic use
document.keywords_array # => ["machine learning", "artificial intelligence", "neural networks"]

# Add/remove keywords programmatically
document.add_keyword("deep learning")
document.remove_keyword("outdated term")

GenerateSummary

The GenerateSummary job creates concise summaries of document content using LLM providers.

module Ragdoll
  module Core
    module Jobs
      class GenerateSummary < ActiveJob::Base
        queue_as :default

        def perform(document_id)
          document = Ragdoll::Document.find(document_id)
          return unless document.content.present?
          return if document.summary.present?

          text_service = TextGenerationService.new
          summary = text_service.generate_summary(document.content)

          document.update!(summary: summary) if summary.present?
        rescue ActiveRecord::RecordNotFound
          # Document was deleted, nothing to do
        rescue StandardError => e
          Rails.logger.error "Failed to generate summary for document #{document_id}: #{e.message}" if defined?(Rails)
          raise e
        end
      end
    end
  end
end

Summarization Features: - Configurable Length: Respects summary_max_length configuration - Content Analysis: Identifies key themes and concepts - Multi-Provider Support: Works with GPT, Claude, Gemini, and other LLMs - Quality Validation: Ensures summary quality and relevance

Configuration Examples:

# Configure summarization in Ragdoll config
Ragdoll::Core.configure do |config|
  config.summarization_config[:enable] = true
  config.summarization_config[:max_length] = 300
  config.summarization_config[:min_content_length] = 500
end

# Access generated summaries
document.summary # => "This document discusses machine learning applications..."
document.has_summary? # => true

Job Configuration

Ragdoll jobs are built on ActiveJob, providing flexible configuration options for different deployment scenarios.

Queue Adapters

Support for all major ActiveJob queue adapters:

# config/application.rb or initializer
config.active_job.queue_adapter = :sidekiq    # Production recommended
# config.active_job.queue_adapter = :resque   # Alternative production option
# config.active_job.queue_adapter = :inline   # Development/testing only
# config.active_job.queue_adapter = :async    # Development with background processing

Sidekiq Configuration (Recommended for Production):

# config/sidekiq.yml
:concurrency: 10
:queues:
  - default
  - ragdoll_processing
  - ragdoll_embeddings

# Gemfile
gem 'sidekiq'
gem 'redis' # Required for Sidekiq

# config/routes.rb (for web UI)
require 'sidekiq/web'
mount Sidekiq::Web => '/sidekiq'

Queue-Specific Configuration:

# Custom job queues for different processing types
class ExtractText < ActiveJob::Base
  queue_as :document_processing
end

class GenerateEmbeddings < ActiveJob::Base
  queue_as :ai_processing  # Separate queue for AI operations
end

# Environment-specific queue names
class ExtractKeywords < ActiveJob::Base
  queue_as Rails.env.production? ? :production_ai : :development_ai
end

Retry Policies and Backoff

Comprehensive retry configuration with exponential backoff:

module Ragdoll
  module Core
    module Jobs
      class BaseJob < ActiveJob::Base
        # Default retry configuration for all Ragdoll jobs
        retry_on StandardError, wait: :exponentially_longer, attempts: 5
        retry_on Timeout::Error, wait: 30.seconds, attempts: 3
        retry_on ActiveRecord::ConnectionNotEstablished, wait: 5.seconds, attempts: 10

        # Specific handling for different error types
        retry_on Net::TimeoutError, wait: :exponentially_longer, attempts: 3
        discard_on ActiveRecord::RecordNotFound
        discard_on ArgumentError

        # Custom retry logic
        retry_on CustomAPIError do |job, exception|
          if exception.response_code == 429 # Rate limited
            job.retry_job(wait: 1.minute)
          elsif exception.response_code >= 500
            job.retry_job(wait: :exponentially_longer)
          else
            job.discard_job
          end
        end
      end
    end
  end
end

# Job-specific retry policies
class GenerateEmbeddings < BaseJob
  # AI API calls may need different retry behavior
  retry_on OpenAI::APIError, wait: 2.minutes, attempts: 3
  retry_on Anthropic::RateLimitError, wait: 5.minutes, attempts: 2
end

Timeout Settings

Configurable timeouts for different job types:

# Per-job timeout configuration
class ExtractText < ActiveJob::Base
  # Large PDFs may take time to process
  queue_with_priority 10
  timeout 15.minutes
end

class GenerateEmbeddings < ActiveJob::Base
  # AI operations can be slow
  timeout 30.minutes
end

# Global timeout configuration
# config/application.rb
config.active_job.default_timeout = 10.minutes

# Sidekiq-specific timeout
# config/sidekiq.yml
:timeout: 1800  # 30 minutes

Priority Levels

Job prioritization for optimal resource allocation:

# High priority for user-facing operations
class ExtractText < ActiveJob::Base
  queue_with_priority 100  # Higher number = higher priority
end

# Lower priority for background optimization
class GenerateEmbeddings < ActiveJob::Base
  queue_with_priority 50
end

# Dynamic priority based on content
class ProcessDocument
  def self.enqueue_with_priority(document)
    priority = case document.document_type
               when 'urgent' then 100
               when 'normal' then 50
               else 10
               end

    ExtractText.set(priority: priority).perform_later(document.id)
  end
end

# Sidekiq priority queues
# config/sidekiq.yml
:queues:
  - [urgent, 10]      # High priority, weight 10
  - [normal, 5]       # Normal priority, weight 5
  - [background, 1]   # Low priority, weight 1

Queue Management

Efficient queue management ensures optimal resource utilization and processing performance across different workload types.

Queue Naming Conventions

Standardized queue naming for clear operational management:

# Ragdoll queue naming conventions
module Ragdoll
  module Core
    module Jobs
      # Primary processing queues
      class ExtractText < ActiveJob::Base
        queue_as 'ragdoll_extraction'
      end

      class GenerateEmbeddings < ActiveJob::Base
        queue_as 'ragdoll_embeddings'
      end

      class GenerateSummary < ActiveJob::Base
        queue_as 'ragdoll_ai_processing'
      end

      class ExtractKeywords < ActiveJob::Base
        queue_as 'ragdoll_ai_processing'
      end
    end
  end
end

# Environment-specific queue names
class BaseJob < ActiveJob::Base
  queue_as ->(job) {
    env_prefix = Rails.env.production? ? 'prod' : Rails.env
    "#{env_prefix}_#{job.class.name.demodulize.underscore}"
  }
end

# Queue naming by workload characteristics
queue_names = {
  cpu_intensive: 'ragdoll_cpu_heavy',     # AI processing, text analysis
  io_intensive: 'ragdoll_io_heavy',       # File processing, network calls
  memory_intensive: 'ragdoll_memory_heavy', # Large document processing
  quick_tasks: 'ragdoll_quick',           # Fast operations
  batch_processing: 'ragdoll_batch'       # Bulk operations
}

Worker Configuration

Optimized worker configuration for different processing types:

# Sidekiq worker configuration
# config/sidekiq.yml
:concurrency: 20
:timeout: 3600
:verbose: false

:queues:
  - [ragdoll_quick, 10]        # Fast jobs, high concurrency
  - [ragdoll_extraction, 5]    # File processing
  - [ragdoll_embeddings, 3]    # AI embeddings (resource intensive)
  - [ragdoll_ai_processing, 2] # LLM operations (rate limited)
  - [ragdoll_batch, 1]         # Batch operations (low priority)

# Environment-specific configurations
production:
  :concurrency: 50
  :timeout: 7200
  :queues:
    - [ragdoll_quick, 20]
    - [ragdoll_extraction, 15]
    - [ragdoll_embeddings, 10]
    - [ragdoll_ai_processing, 5]

development:
  :concurrency: 5
  :timeout: 300
  :queues:
    - [ragdoll_quick, 3]
    - [ragdoll_extraction, 2]
    - [ragdoll_embeddings, 1]

Per-Worker Resource Limits:

# Worker-specific configuration
class AIProcessingWorker
  include Sidekiq::Worker

  sidekiq_options queue: 'ragdoll_ai_processing',
                  concurrency: 2,        # Limit concurrent AI calls
                  memory_limit: '2GB',   # Memory constraint
                  timeout: 1800         # 30 minute timeout
end

# Resource-aware job assignment
class ResourceManager
  def self.optimal_queue_for_job(job_type, content_size)
    case job_type
    when :embedding_generation
      content_size > 10_000 ? 'ragdoll_embeddings_large' : 'ragdoll_embeddings'
    when :text_extraction
      content_size > 50_000 ? 'ragdoll_extraction_large' : 'ragdoll_extraction'
    else
      'ragdoll_default'
    end
  end
end

Scaling Strategies

Horizontal and vertical scaling approaches for different workloads:

# Auto-scaling configuration
class AutoScaler
  def self.scale_workers_based_on_queue_depth
    queue_depths = Sidekiq::Queue.all.map { |q| [q.name, q.size] }.to_h

    scaling_rules = {
      'ragdoll_embeddings' => { threshold: 50, max_workers: 10 },
      'ragdoll_extraction' => { threshold: 100, max_workers: 20 },
      'ragdoll_ai_processing' => { threshold: 20, max_workers: 5 }
    }

    scaling_rules.each do |queue_name, config|
      current_depth = queue_depths[queue_name] || 0

      if current_depth > config[:threshold]
        scale_up_workers(queue_name, config[:max_workers])
      elsif current_depth < config[:threshold] / 4
        scale_down_workers(queue_name)
      end
    end
  end

  private

  def self.scale_up_workers(queue_name, max_workers)
    # Kubernetes/Docker scaling logic
    current_workers = get_current_worker_count(queue_name)
    return if current_workers >= max_workers

    target_workers = [current_workers + 2, max_workers].min
    execute_scaling_command(queue_name, target_workers)
  end
end

# Kubernetes horizontal pod autoscaler configuration
# k8s/ragdoll-workers-hpa.yml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ragdoll-workers
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ragdoll-sidekiq-workers
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Pods
    pods:
      metric:
        name: sidekiq_queue_depth
      target:
        type: AverageValue
        averageValue: "10"

Load Balancing

Intelligent load distribution across workers and queues:

# Queue load balancing
class LoadBalancer
  def self.distribute_job(job_class, *args)
    # Find least loaded queue for job type
    available_queues = queues_for_job_type(job_class)
    optimal_queue = find_least_loaded_queue(available_queues)

    job_class.set(queue: optimal_queue).perform_later(*args)
  end

  def self.queues_for_job_type(job_class)
    case job_class.name
    when /Extract/
      %w[ragdoll_extraction_1 ragdoll_extraction_2 ragdoll_extraction_3]
    when /Generate.*Embedding/
      %w[ragdoll_embeddings_1 ragdoll_embeddings_2]
    when /AI|Summary|Keywords/
      %w[ragdoll_ai_1 ragdoll_ai_2]
    else
      %w[ragdoll_default]
    end
  end

  def self.find_least_loaded_queue(queue_names)
    queue_loads = queue_names.map do |name|
      queue = Sidekiq::Queue.new(name)
      [name, queue.size + active_jobs_count(name)]
    end

    queue_loads.min_by { |_, load| load }.first
  end

  private

  def self.active_jobs_count(queue_name)
    Sidekiq::Workers.new.count { |_, _, work| work['queue'] == queue_name }
  end
end

# Geographic load balancing for distributed deployments
class GeographicLoadBalancer
  REGIONS = {
    'us-east-1' => %w[ragdoll_us_east_1 ragdoll_us_east_2],
    'us-west-2' => %w[ragdoll_us_west_1 ragdoll_us_west_2],
    'eu-west-1' => %w[ragdoll_eu_west_1 ragdoll_eu_west_2]
  }

  def self.route_job_by_region(job_class, region, *args)
    regional_queues = REGIONS[region] || REGIONS['us-east-1']
    optimal_queue = LoadBalancer.find_least_loaded_queue(regional_queues)

    job_class.set(queue: optimal_queue).perform_later(*args)
  end
end

# Usage examples
# Route to least loaded extraction queue
LoadBalancer.distribute_job(Ragdoll::ExtractTextJob, document.id)

# Route by geographic region
GeographicLoadBalancer.route_job_by_region(
  Ragdoll::GenerateEmbeddingsJob,
  'us-west-2',
  document.id
)

Error Handling

Robust error handling ensures reliable document processing and provides comprehensive failure recovery mechanisms.

Retry Strategies

Multi-layered retry logic with intelligent backoff and error classification:

module Ragdoll
  module Core
    module Jobs
      class BaseJob < ActiveJob::Base
        # Comprehensive retry configuration
        retry_on StandardError, wait: :exponentially_longer, attempts: 5 do |job, exception|
          # Log retry attempt
          Rails.logger.warn "Retrying #{job.class.name} (attempt #{job.executions}): #{exception.message}" if defined?(Rails)

          # Update document status if applicable
          if job.respond_to?(:document_id) && job.arguments.first
            document = Ragdoll::Document.find_by(id: job.arguments.first)
            document&.update(status: 'processing') # Keep as processing during retries
          end
        end

        # Specific retry policies for different error types
        retry_on Net::TimeoutError, wait: 30.seconds, attempts: 3
        retry_on ActiveRecord::ConnectionNotEstablished, wait: 5.seconds, attempts: 10
        retry_on Errno::ECONNREFUSED, wait: 1.minute, attempts: 5

        # AI API specific retries
        retry_on OpenAI::RateLimitError, wait: 2.minutes, attempts: 3
        retry_on OpenAI::APIConnectionError, wait: :exponentially_longer, attempts: 5

        # Immediate failures - don't retry
        discard_on ActiveRecord::RecordNotFound
        discard_on ArgumentError
        discard_on JSON::ParserError

        # Custom retry logic for specific scenarios
        retry_on CustomProcessingError do |job, exception|
          case exception.error_code
          when 'file_corrupted'
            job.discard_job # Don't retry corrupted files
          when 'temporary_unavailable'
            job.retry_job(wait: 5.minutes)
          when 'rate_limited'
            job.retry_job(wait: exception.retry_after || 60.seconds)
          else
            job.retry_job(wait: :exponentially_longer)
          end
        end

        # Final failure handling
        discard_on StandardError do |job, exception|
          handle_final_failure(job, exception)
        end

        private

        def self.handle_final_failure(job, exception)
          # Mark document as failed if applicable
          if job.respond_to?(:document_id) && job.arguments.first
            document = Ragdoll::Document.find_by(id: job.arguments.first)
            if document
              document.update!(
                status: 'error',
                metadata: document.metadata.merge(
                  'error' => {
                    'message' => exception.message,
                    'backtrace' => exception.backtrace&.first(5),
                    'job_class' => job.class.name,
                    'failed_at' => Time.current.iso8601,
                    'attempts' => job.executions
                  }
                )
              )
            end
          end

          # Send failure notification
          ErrorNotifier.notify_job_failure(job, exception)
        end
      end
    end
  end
end

# Job-specific retry strategies
class ExtractText < BaseJob
  # File processing may have different retry needs
  retry_on Errno::ENOENT, attempts: 1 # File not found - don't retry
  retry_on PDF::Reader::MalformedPDFError, attempts: 2, wait: 10.seconds
  retry_on Docx::DocumentCorrupted, attempts: 1 # Don't retry corrupted docs
end

class GenerateEmbeddings < BaseJob
  # AI operations need careful retry handling
  retry_on OpenAI::APIError do |job, exception|
    if exception.response&.dig('error', 'code') == 'context_length_exceeded'
      # Try with smaller chunks
      job.arguments[1] = (job.arguments[1] || 1000) * 0.8 # Reduce chunk size
      job.retry_job(wait: 1.minute) if job.executions < 3
    else
      job.retry_job(wait: :exponentially_longer)
    end
  end
end

Dead Letter Queues

Manage permanently failed jobs with comprehensive tracking:

# Dead letter queue implementation
module Ragdoll
  module Core
    class DeadLetterQueue
      def self.store_failed_job(job, exception)
        failed_job_record = {
          id: SecureRandom.uuid,
          job_class: job.class.name,
          job_arguments: job.arguments,
          queue_name: job.queue_name,
          failed_at: Time.current,
          exception_class: exception.class.name,
          exception_message: exception.message,
          exception_backtrace: exception.backtrace,
          job_executions: job.executions,
          enqueued_at: job.enqueued_at,
          created_at: Time.current
        }

        # Store in database table or external storage
        store_failed_job_record(failed_job_record)

        # Optional: Store in Redis for quick access
        Redis.current.lpush('ragdoll:dead_letter_queue', failed_job_record.to_json)
        Redis.current.expire('ragdoll:dead_letter_queue', 30.days)
      end

      def self.replay_failed_job(failed_job_id)
        failed_job = get_failed_job_record(failed_job_id)
        return false unless failed_job

        job_class = failed_job[:job_class].constantize
        job_class.perform_later(*failed_job[:job_arguments])

        # Mark as replayed
        mark_job_replayed(failed_job_id)
        true
      rescue => e
        Rails.logger.error "Failed to replay job #{failed_job_id}: #{e.message}" if defined?(Rails)
        false
      end

      def self.failed_jobs_summary
        # Get failed jobs statistics
        {
          total_failed: count_failed_jobs,
          by_job_class: failed_jobs_by_class,
          by_error_type: failed_jobs_by_error,
          recent_failures: recent_failed_jobs(24.hours),
          retry_candidates: jobs_eligible_for_retry
        }
      end

      private

      def self.store_failed_job_record(record)
        # Implementation depends on storage choice
        # Could be database table, file system, or external service
        Rails.logger.error "FAILED JOB: #{record.to_json}" if defined?(Rails)
      end
    end
  end
end

# Sidekiq dead job handling
class SidekiqDeadJobHandler
  def self.process_dead_jobs
    dead_set = Sidekiq::DeadSet.new

    dead_set.each do |job|
      if job['class'].start_with?('Ragdoll::')
        # Extract Ragdoll-specific failed jobs
        failed_job_data = {
          job_class: job['class'],
          arguments: job['args'],
          error_message: job['error_message'],
          failed_at: Time.at(job['failed_at']),
          retry_count: job['retry_count']
        }

        Ragdoll::Core::DeadLetterQueue.store_failed_job_data(failed_job_data)
      end
    end
  end

  def self.clear_old_dead_jobs(older_than: 30.days)
    dead_set = Sidekiq::DeadSet.new
    cutoff_time = older_than.ago

    dead_set.select { |job| Time.at(job['failed_at']) < cutoff_time }.each(&:delete)
  end
end

Error Notification

Comprehensive error notification system:

module Ragdoll
  module Core
    class ErrorNotifier
      def self.notify_job_failure(job, exception)
        notification_data = {
          job_class: job.class.name,
          job_id: job.job_id,
          arguments: sanitize_arguments(job.arguments),
          exception: {
            class: exception.class.name,
            message: exception.message,
            backtrace: exception.backtrace&.first(10)
          },
          context: {
            queue: job.queue_name,
            executions: job.executions,
            enqueued_at: job.enqueued_at,
            failed_at: Time.current
          },
          environment: Rails.env,
          severity: determine_severity(job, exception)
        }

        # Send to multiple notification channels
        send_to_configured_channels(notification_data)
      end

      def self.notify_queue_backlog(queue_name, depth)
        return unless depth > backlog_threshold(queue_name)

        notification = {
          type: 'queue_backlog',
          queue: queue_name,
          depth: depth,
          threshold: backlog_threshold(queue_name),
          severity: depth > (backlog_threshold(queue_name) * 2) ? 'critical' : 'warning'
        }

        send_to_configured_channels(notification)
      end

      private

      def self.send_to_configured_channels(data)
        channels = Ragdoll.config&.notification_channels || [:log]

        channels.each do |channel|
          case channel
          when :log
            log_error(data)
          when :email
            send_email_notification(data)
          when :slack
            send_slack_notification(data)
          when :webhook
            send_webhook_notification(data)
          when :sentry
            send_sentry_notification(data)
          end
        end
      end

      def self.log_error(data)
        logger = defined?(Rails) ? Rails.logger : Logger.new(STDOUT)
        logger.error "[RAGDOLL JOB FAILURE] #{data[:job_class]}: #{data[:exception][:message]}"
      end

      def self.send_slack_notification(data)
        return unless ENV['SLACK_WEBHOOK_URL']

        payload = {
          text: "Ragdoll Job Failure: #{data[:job_class]}",
          attachments: [{
            color: data[:severity] == 'critical' ? 'danger' : 'warning',
            fields: [
              { title: 'Job Class', value: data[:job_class], short: true },
              { title: 'Queue', value: data[:context][:queue], short: true },
              { title: 'Error', value: data[:exception][:message], short: false },
              { title: 'Attempts', value: data[:context][:executions], short: true }
            ],
            timestamp: Time.current.to_i
          }]
        }

        # Send webhook (implementation depends on HTTP library)
        send_webhook(ENV['SLACK_WEBHOOK_URL'], payload)
      end

      def self.determine_severity(job, exception)
        case exception
        when ActiveRecord::RecordNotFound, ArgumentError
          'low'      # Expected failures
        when Net::TimeoutError, Errno::ECONNREFUSED
          'medium'   # Infrastructure issues
        when OpenAI::RateLimitError
          'medium'   # External service limits
        else
          job.executions >= 3 ? 'high' : 'medium'
        end
      end
    end
  end
end

Debugging Approaches

Comprehensive debugging tools and techniques:

# Job debugging utilities
module Ragdoll
  module Core
    module JobDebugging
      def self.debug_failed_document(document_id)
        document = Ragdoll::Document.find(document_id)

        debug_info = {
          document: {
            id: document.id,
            status: document.status,
            document_type: document.document_type,
            location: document.location,
            content_present: document.content.present?,
            content_length: document.content&.length,
            metadata: document.metadata
          },
          processing_history: get_processing_history(document_id),
          related_jobs: get_related_jobs(document_id),
          system_state: get_system_state_at_failure(document_id)
        }

        puts JSON.pretty_generate(debug_info)
        debug_info
      end

      def self.trace_job_execution(job_class, *args)
        # Enable detailed logging for specific job execution
        original_log_level = Rails.logger.level if defined?(Rails)

        begin
          Rails.logger.level = Logger::DEBUG if defined?(Rails)

          # Add execution tracing
          job_instance = job_class.new(*args)

          puts "=== Starting #{job_class.name} execution ==="
          puts "Arguments: #{args.inspect}"
          puts "Queue: #{job_instance.queue_name}"

          start_time = Time.current
          result = job_instance.perform_now
          end_time = Time.current

          puts "=== Completed in #{((end_time - start_time) * 1000).round(2)}ms ==="
          puts "Result: #{result.inspect}"

          result
        rescue => e
          puts "=== FAILED with #{e.class.name}: #{e.message} ==="
          puts "Backtrace:"
          puts e.backtrace.first(10).map { |line| "  #{line}" }
          raise e
        ensure
          Rails.logger.level = original_log_level if defined?(Rails) && original_log_level
        end
      end

      def self.analyze_queue_performance(queue_name, period: 24.hours)
        # Analyze job performance in specific queue
        stats = {
          queue_name: queue_name,
          period: period,
          current_depth: Sidekiq::Queue.new(queue_name).size,
          processed_jobs: get_processed_jobs_count(queue_name, period),
          failed_jobs: get_failed_jobs_count(queue_name, period),
          avg_processing_time: calculate_avg_processing_time(queue_name, period),
          slowest_jobs: get_slowest_jobs(queue_name, period, limit: 5)
        }

        puts "=== Queue Performance Analysis: #{queue_name} ==="
        puts JSON.pretty_generate(stats)
        stats
      end

      private

      def self.get_processing_history(document_id)
        # Get timeline of document processing attempts
        # This would typically come from application logs or database
        [
          # Mock data structure
          {
            timestamp: 1.hour.ago,
            event: 'ExtractText queued',
            details: { queue: 'ragdoll_extraction' }
          },
          {
            timestamp: 55.minutes.ago,
            event: 'ExtractText started',
            details: { worker_id: 'worker-1' }
          },
          {
            timestamp: 50.minutes.ago,
            event: 'ExtractText failed',
            details: { error: 'PDF parsing error', attempt: 1 }
          }
        ]
      end
    end
  end
end

# Usage examples
# Debug a specific failed document
Ragdoll::Core::JobDebugging.debug_failed_document(123)

# Trace a job execution with detailed logging
Ragdoll::Core::JobDebugging.trace_job_execution(
  Ragdoll::ExtractTextJob,
  document.id
)

# Analyze queue performance
Ragdoll::Core::JobDebugging.analyze_queue_performance(
  'ragdoll_embeddings',
  period: 7.days
)

Monitoring

Comprehensive monitoring and observability for job performance, queue health, and system reliability.

Job Status Tracking

Real-time tracking of job execution status through document lifecycle:

module Ragdoll
  module Core
    class JobStatusTracker
      def self.document_processing_status(document_id)
        document = Ragdoll::Document.find(document_id)

        {
          document_id: document.id,
          current_status: document.status,
          processing_stages: {
            text_extraction: check_text_extraction_status(document),
            embedding_generation: check_embedding_status(document),
            summary_generation: check_summary_status(document),
            keyword_extraction: check_keyword_status(document)
          },
          queue_positions: get_queue_positions(document_id),
          estimated_completion: estimate_completion_time(document),
          processing_metrics: get_processing_metrics(document)
        }
      end

      def self.system_processing_overview
        {
          total_documents: Ragdoll::Document.count,
          status_distribution: Ragdoll::Document.group(:status).count,
          processing_pipeline: {
            pending_extraction: Ragdoll::Document.where(status: 'pending').count,
            currently_processing: Ragdoll::Document.where(status: 'processing').count,
            completed_today: Ragdoll::Document
              .where(status: 'processed')
              .where('updated_at > ?', Date.current)
              .count,
            failed_processing: Ragdoll::Document.where(status: 'error').count
          },
          queue_depths: get_all_queue_depths,
          active_workers: get_active_worker_count
        }
      end

      private

      def self.check_text_extraction_status(document)
        {
          completed: document.content.present?,
          content_length: document.content&.length || 0,
          extraction_method: determine_extraction_method(document)
        }
      end

      def self.check_embedding_status(document)
        embeddings_count = document.total_embedding_count
        {
          completed: embeddings_count > 0,
          embeddings_count: embeddings_count,
          by_content_type: document.embeddings_by_type
        }
      end

      def self.get_queue_positions(document_id)
        # Check position in various queues
        positions = {}

        %w[ragdoll_extraction ragdoll_embeddings ragdoll_ai_processing].each do |queue_name|
          queue = Sidekiq::Queue.new(queue_name)
          position = queue.find_job { |job| job.args.first == document_id }
          positions[queue_name] = position ? queue.to_a.index(position) + 1 : nil
        end

        positions
      end
    end
  end
end

# Real-time status updates using ActionCable (if in Rails app)
class JobStatusChannel < ApplicationCable::Channel
  def subscribed
    stream_from "job_status_#{params[:document_id]}"
  end

  def self.broadcast_status_update(document_id, status_data)
    ActionCable.server.broadcast(
      "job_status_#{document_id}",
      status_data
    )
  end
end

# Update status from jobs
# In job classes:
after_perform do |job|
  if job.arguments.first # document_id
    status_data = JobStatusTracker.document_processing_status(job.arguments.first)
    JobStatusChannel.broadcast_status_update(job.arguments.first, status_data)
  end
end

Performance Metrics

Detailed performance tracking and analysis:

class JobPerformanceMetrics
  def self.collect_performance_data
    {
      timestamp: Time.current.iso8601,
      processing_times: calculate_processing_times,
      throughput_metrics: calculate_throughput,
      resource_utilization: measure_resource_usage,
      error_rates: calculate_error_rates,
      queue_performance: analyze_queue_performance
    }
  end

  def self.calculate_processing_times
    recent_docs = Ragdoll::Document
      .where('updated_at > ?', 24.hours.ago)
      .where(status: 'processed')
      .includes(:contents)

    processing_times = recent_docs.map do |doc|
      total_time = (doc.updated_at - doc.created_at).to_i
      {
        document_id: doc.id,
        document_type: doc.document_type,
        content_size: doc.content&.length || 0,
        processing_time_seconds: total_time,
        embedding_count: doc.total_embedding_count
      }
    end

    {
      individual_times: processing_times,
      averages: {
        overall: processing_times.map { |t| t[:processing_time_seconds] }.sum / processing_times.length,
        by_document_type: processing_times.group_by { |t| t[:document_type] }
                                         .transform_values { |times| 
                                           times.map { |t| t[:processing_time_seconds] }.sum / times.length 
                                         },
        by_content_size: calculate_time_by_content_size(processing_times)
      }
    }
  end

  def self.calculate_throughput
    time_periods = {
      last_hour: 1.hour.ago,
      last_24_hours: 24.hours.ago,
      last_week: 1.week.ago
    }

    throughput_data = {}

    time_periods.each do |period, start_time|
      processed_count = Ragdoll::Document
        .where('updated_at > ?', start_time)
        .where(status: 'processed')
        .count

      time_span_hours = (Time.current - start_time) / 1.hour

      throughput_data[period] = {
        documents_processed: processed_count,
        documents_per_hour: processed_count / time_span_hours,
        embeddings_generated: calculate_embeddings_in_period(start_time),
        summaries_created: Ragdoll::Document
          .where('updated_at > ?', start_time)
          .where.not(summary: [nil, ''])
          .count
      }
    end

    throughput_data
  end

  def self.measure_resource_usage
    {
      memory: {
        current_mb: `ps -o rss= -p #{Process.pid}`.to_i / 1024,
        gc_stats: GC.stat
      },
      database: {
        active_connections: ActiveRecord::Base.connection_pool.stat[:checked_out],
        pool_size: ActiveRecord::Base.connection_pool.stat[:size],
        connection_usage_percent: (
          ActiveRecord::Base.connection_pool.stat[:checked_out].to_f / 
          ActiveRecord::Base.connection_pool.stat[:size] * 100
        ).round(2)
      },
      redis: redis_memory_usage,
      sidekiq: {
        busy_workers: Sidekiq::Workers.new.size,
        total_processed: Sidekiq::Stats.new.processed,
        total_failed: Sidekiq::Stats.new.failed
      }
    }
  end

  def self.calculate_error_rates
    time_periods = [1.hour.ago, 24.hours.ago, 1.week.ago]

    error_rates = {}

    time_periods.each do |start_time|
      period_name = case start_time
                    when 1.hour.ago then :last_hour
                    when 24.hours.ago then :last_24_hours
                    else :last_week
                    end

      total_attempts = Ragdoll::Document.where('created_at > ?', start_time).count
      failed_attempts = Ragdoll::Document
        .where('created_at > ?', start_time)
        .where(status: 'error')
        .count

      error_rates[period_name] = {
        total_attempts: total_attempts,
        failed_attempts: failed_attempts,
        error_rate_percent: total_attempts > 0 ? (failed_attempts.to_f / total_attempts * 100).round(2) : 0,
        error_types: get_error_types_in_period(start_time)
      }
    end

    error_rates
  end

  private

  def self.redis_memory_usage
    return {} unless defined?(Redis)

    redis_info = Redis.current.info
    {
      used_memory_mb: redis_info['used_memory'].to_i / (1024 * 1024),
      connected_clients: redis_info['connected_clients'].to_i,
      total_commands_processed: redis_info['total_commands_processed'].to_i
    }
  rescue
    { error: 'Redis connection unavailable' }
  end
end

# Automated performance reporting
class PerformanceReportJob < ActiveJob::Base
  queue_as :default

  def perform
    metrics = JobPerformanceMetrics.collect_performance_data

    # Store metrics for historical analysis
    store_performance_metrics(metrics)

    # Send alerts if performance degrades
    check_performance_alerts(metrics)

    # Generate daily/weekly reports
    generate_performance_report(metrics) if should_generate_report?
  end

  private

  def check_performance_alerts(metrics)
    alerts = []

    # Check processing time alerts
    avg_processing_time = metrics[:processing_times][:averages][:overall]
    if avg_processing_time > 300 # 5 minutes
      alerts << {
        type: :slow_processing,
        severity: avg_processing_time > 600 ? :critical : :warning,
        message: "Average processing time: #{avg_processing_time}s"
      }
    end

    # Check error rate alerts
    error_rate = metrics[:error_rates][:last_hour][:error_rate_percent]
    if error_rate > 5
      alerts << {
        type: :high_error_rate,
        severity: error_rate > 15 ? :critical : :warning,
        message: "Error rate in last hour: #{error_rate}%"
      }
    end

    # Send alerts if any found
    alerts.each { |alert| send_performance_alert(alert) }
  end
end

Queue Health Monitoring

Comprehensive queue monitoring and health checks:

class QueueHealthMonitor
  HEALTH_THRESHOLDS = {
    queue_depth: {
      warning: 50,
      critical: 200
    },
    processing_rate: {
      warning: 10,  # jobs per minute
      critical: 5
    },
    worker_utilization: {
      warning: 80,  # percentage
      critical: 95
    }
  }

  def self.check_all_queues
    queue_names = %w[
      ragdoll_extraction
      ragdoll_embeddings
      ragdoll_ai_processing
      ragdoll_quick
    ]

    health_report = {
      timestamp: Time.current.iso8601,
      overall_health: 'healthy',
      queues: {},
      alerts: []
    }

    queue_names.each do |queue_name|
      queue_health = analyze_queue_health(queue_name)
      health_report[:queues][queue_name] = queue_health

      # Collect alerts
      if queue_health[:alerts].any?
        health_report[:alerts].concat(queue_health[:alerts])
      end

      # Update overall health
      if queue_health[:status] == 'critical'
        health_report[:overall_health] = 'critical'
      elsif queue_health[:status] == 'warning' && health_report[:overall_health] == 'healthy'
        health_report[:overall_health] = 'warning'
      end
    end

    health_report
  end

  def self.analyze_queue_health(queue_name)
    queue = Sidekiq::Queue.new(queue_name)

    health_data = {
      name: queue_name,
      depth: queue.size,
      latency: queue.latency,
      processing_rate: calculate_processing_rate(queue_name),
      worker_utilization: calculate_worker_utilization(queue_name),
      oldest_job_age: calculate_oldest_job_age(queue),
      status: 'healthy',
      alerts: []
    }

    # Check depth threshold
    if health_data[:depth] >= HEALTH_THRESHOLDS[:queue_depth][:critical]
      health_data[:status] = 'critical'
      health_data[:alerts] << {
        type: :queue_depth,
        severity: :critical,
        message: "Queue depth #{health_data[:depth]} exceeds critical threshold"
      }
    elsif health_data[:depth] >= HEALTH_THRESHOLDS[:queue_depth][:warning]
      health_data[:status] = 'warning'
      health_data[:alerts] << {
        type: :queue_depth,
        severity: :warning,
        message: "Queue depth #{health_data[:depth]} exceeds warning threshold"
      }
    end

    # Check processing rate
    if health_data[:processing_rate] <= HEALTH_THRESHOLDS[:processing_rate][:critical]
      health_data[:status] = 'critical'
      health_data[:alerts] << {
        type: :slow_processing,
        severity: :critical,
        message: "Processing rate #{health_data[:processing_rate]} jobs/min is critically low"
      }
    elsif health_data[:processing_rate] <= HEALTH_THRESHOLDS[:processing_rate][:warning]
      health_data[:status] = 'warning' if health_data[:status] == 'healthy'
      health_data[:alerts] << {
        type: :slow_processing,
        severity: :warning,
        message: "Processing rate #{health_data[:processing_rate]} jobs/min is below normal"
      }
    end

    health_data
  end

  def self.queue_trending_analysis(period: 24.hours)
    # Analyze queue depth trends over time
    queue_names = %w[ragdoll_extraction ragdoll_embeddings ragdoll_ai_processing]

    trending_data = {}

    queue_names.each do |queue_name|
      # This would typically come from stored metrics
      # For now, simulate trend analysis
      trending_data[queue_name] = {
        current_depth: Sidekiq::Queue.new(queue_name).size,
        trend_direction: calculate_trend_direction(queue_name, period),
        peak_depth_24h: get_peak_depth(queue_name, period),
        average_depth_24h: get_average_depth(queue_name, period),
        prediction: predict_queue_behavior(queue_name)
      }
    end

    trending_data
  end

  private

  def self.calculate_processing_rate(queue_name)
    # Calculate jobs processed per minute for this queue
    # This would come from Sidekiq stats or custom metrics
    processed_count = get_processed_jobs_last_hour(queue_name)
    processed_count / 60.0 # per minute
  end

  def self.calculate_worker_utilization(queue_name)
    # Calculate percentage of workers busy with this queue
    total_workers = Sidekiq::ProcessSet.new.sum(&:busy)
    queue_workers = count_workers_processing_queue(queue_name)

    return 0 if total_workers == 0
    (queue_workers.to_f / total_workers * 100).round(2)
  end
end

# Health monitoring job
class QueueHealthCheckJob < ActiveJob::Base
  queue_as :default

  def perform
    health_report = QueueHealthMonitor.check_all_queues

    # Store health metrics
    store_health_report(health_report)

    # Send alerts for critical issues
    critical_alerts = health_report[:alerts].select { |a| a[:severity] == :critical }
    critical_alerts.each { |alert| send_critical_alert(alert) }

    # Log health status
    Rails.logger.info "Queue Health: #{health_report[:overall_health]}" if defined?(Rails)

    health_report
  end
end

Alerting Strategies

Intelligent alerting with escalation and noise reduction:

class JobAlertingSystem
  ALERT_RULES = {
    queue_backlog: {
      thresholds: { warning: 50, critical: 200 },
      escalation_time: 15.minutes,
      channels: [:log, :slack, :email]
    },
    high_error_rate: {
      thresholds: { warning: 5, critical: 15 }, # percentage
      escalation_time: 5.minutes,
      channels: [:log, :slack, :webhook]
    },
    slow_processing: {
      thresholds: { warning: 300, critical: 600 }, # seconds
      escalation_time: 30.minutes,
      channels: [:log, :email]
    },
    worker_shortage: {
      thresholds: { warning: 90, critical: 100 }, # utilization percentage
      escalation_time: 10.minutes,
      channels: [:log, :slack, :pager]
    }
  }

  def self.evaluate_alerts
    current_metrics = collect_current_metrics
    active_alerts = []

    ALERT_RULES.each do |alert_type, config|
      alert_value = extract_metric_value(current_metrics, alert_type)

      severity = determine_severity(alert_value, config[:thresholds])

      if severity
        alert = create_alert(alert_type, severity, alert_value, config)
        active_alerts << alert

        # Check if this is a new alert or escalation
        if should_send_alert?(alert)
          send_alert(alert, config[:channels])
        end
      end
    end

    # Update alert history
    update_alert_history(active_alerts)

    active_alerts
  end

  def self.create_alert(alert_type, severity, current_value, config)
    {
      id: generate_alert_id(alert_type, severity),
      type: alert_type,
      severity: severity,
      current_value: current_value,
      threshold: config[:thresholds][severity],
      message: generate_alert_message(alert_type, severity, current_value),
      timestamp: Time.current,
      escalation_time: config[:escalation_time],
      channels: config[:channels]
    }
  end

  def self.should_send_alert?(alert)
    # Prevent alert spam by checking recent alert history
    recent_similar = get_recent_alerts(alert[:type], alert[:severity], 30.minutes)

    # Send if no recent similar alerts
    return true if recent_similar.empty?

    # Send if value has significantly worsened
    last_alert = recent_similar.last
    return true if alert[:current_value] > (last_alert[:current_value] * 1.5)

    # Send if escalation time has passed
    time_since_last = Time.current - last_alert[:timestamp]
    return true if time_since_last >= alert[:escalation_time]

    false
  end

  def self.generate_alert_message(alert_type, severity, current_value)
    case alert_type
    when :queue_backlog
      "Queue backlog #{severity}: #{current_value} jobs waiting"
    when :high_error_rate
      "High error rate #{severity}: #{current_value}% of jobs failing"
    when :slow_processing
      "Slow processing #{severity}: Average #{current_value}s per job"
    when :worker_shortage
      "Worker shortage #{severity}: #{current_value}% utilization"
    else
      "System alert #{severity}: #{alert_type} = #{current_value}"
    end
  end

  def self.send_alert(alert, channels)
    channels.each do |channel|
      case channel
      when :log
        log_alert(alert)
      when :slack
        send_slack_alert(alert)
      when :email
        send_email_alert(alert)
      when :webhook
        send_webhook_alert(alert)
      when :pager
        send_pager_alert(alert) if alert[:severity] == :critical
      end
    end
  end

  private

  def self.collect_current_metrics
    {
      queue_depths: get_all_queue_depths,
      error_rates: calculate_current_error_rates,
      processing_times: get_recent_processing_times,
      worker_utilization: calculate_worker_utilization
    }
  end
end

# Scheduled alert evaluation
class AlertEvaluationJob < ActiveJob::Base
  queue_as :default

  def perform
    alerts = JobAlertingSystem.evaluate_alerts

    # Log alert summary
    if alerts.any?
      critical_count = alerts.count { |a| a[:severity] == :critical }
      warning_count = alerts.count { |a| a[:severity] == :warning }

      Rails.logger.info "Active alerts: #{critical_count} critical, #{warning_count} warnings" if defined?(Rails)
    end

    alerts
  end
end

# Schedule alert evaluation every 5 minutes
# In Rails initializer:
# AlertEvaluationJob.set(cron: '*/5 * * * *').perform_later

Custom Jobs

Create custom background jobs that integrate seamlessly with Ragdoll's processing pipeline and follow established patterns.

Job Class Patterns

Standardized patterns for creating custom jobs:

module Ragdoll
  module Core
    module Jobs
      # Base class for all custom Ragdoll jobs
      class CustomBaseJob < ActiveJob::Base
        # Inherit standard retry and error handling
        include JobErrorHandling

        queue_as :custom_processing

        # Standard retry configuration
        retry_on StandardError, wait: :exponentially_longer, attempts: 3
        discard_on ActiveRecord::RecordNotFound

        private

        # Helper method for document status updates
        def update_document_status(document_id, status, metadata = {})
          document = Ragdoll::Document.find_by(id: document_id)
          return unless document

          update_data = { status: status }
          if metadata.any?
            update_data[:metadata] = document.metadata.merge(metadata)
          end

          document.update!(update_data)
        end

        # Helper for logging job progress
        def log_progress(message, document_id: nil)
          log_data = {
            job_class: self.class.name,
            message: message,
            job_id: job_id,
            timestamp: Time.current.iso8601
          }
          log_data[:document_id] = document_id if document_id

          Rails.logger.info log_data.to_json if defined?(Rails)
        end
      end
    end
  end
end

# Example custom job: Advanced content analysis
class AdvancedContentAnalysis < Ragdoll::CustomBaseJob
  def perform(document_id, analysis_options = {})
    log_progress("Starting advanced content analysis", document_id: document_id)

    document = Ragdoll::Document.find(document_id)
    return unless document.content.present?

    # Update status to show processing
    update_document_status(document_id, 'processing', {
      'analysis_stage' => 'advanced_analysis',
      'analysis_options' => analysis_options
    })

    analysis_results = {
      sentiment_analysis: perform_sentiment_analysis(document.content),
      topic_modeling: perform_topic_modeling(document.content),
      entity_extraction: extract_named_entities(document.content),
      readability_score: calculate_readability(document.content)
    }

    # Store analysis results in metadata
    update_document_status(document_id, 'processed', {
      'advanced_analysis' => analysis_results,
      'analysis_completed_at' => Time.current.iso8601
    })

    log_progress("Advanced content analysis completed", document_id: document_id)

    # Queue follow-up jobs if needed
    if analysis_results[:sentiment_analysis][:score] < -0.5
      ContentModerationJob.perform_later(document_id)
    end

    analysis_results
  end

  private

  def perform_sentiment_analysis(content)
    # Integration with sentiment analysis service
    # This could use various APIs or libraries
    {
      score: rand(-1.0..1.0), # Mock score
      label: %w[positive negative neutral].sample,
      confidence: rand(0.5..1.0)
    }
  end

  def perform_topic_modeling(content)
    # Topic modeling implementation
    {
      primary_topics: ['technology', 'business', 'science'],
      topic_distribution: { 'technology' => 0.6, 'business' => 0.3, 'science' => 0.1 }
    }
  end
end

# Example: Batch processing job
class BatchDocumentProcessor < Ragdoll::CustomBaseJob
  queue_as :batch_processing

  def perform(document_ids, processing_options = {})
    log_progress("Starting batch processing of #{document_ids.length} documents")

    results = {
      total_documents: document_ids.length,
      successful: 0,
      failed: 0,
      processing_times: [],
      errors: []
    }

    document_ids.each_with_index do |document_id, index|
      start_time = Time.current

      begin
        process_single_document(document_id, processing_options)
        results[:successful] += 1

        processing_time = Time.current - start_time
        results[:processing_times] << processing_time

        # Progress reporting every 10 documents
        if (index + 1) % 10 == 0
          log_progress("Processed #{index + 1}/#{document_ids.length} documents")
        end

      rescue => e
        results[:failed] += 1
        results[:errors] << {
          document_id: document_id,
          error: e.message,
          backtrace: e.backtrace&.first(3)
        }

        Rails.logger.error "Batch processing failed for document #{document_id}: #{e.message}" if defined?(Rails)
      end
    end

    log_progress("Batch processing completed: #{results[:successful]} successful, #{results[:failed]} failed")
    results
  end

  private

  def process_single_document(document_id, options)
    # Queue individual processing jobs
    ExtractText.perform_now(document_id) if options[:extract_text]
    GenerateEmbeddings.perform_now(document_id) if options[:generate_embeddings]
    GenerateSummary.perform_now(document_id) if options[:generate_summary]
  end
end

Integration with Core Services

Seamless integration with Ragdoll services:

# Custom job using core services
class CustomEmbeddingEnhancement < Ragdoll::CustomBaseJob
  def perform(document_id, enhancement_type)
    document = Ragdoll::Document.find(document_id)

    # Use core embedding service
    embedding_service = Ragdoll::EmbeddingService.new

    case enhancement_type
    when 'multi_provider'
      enhance_with_multiple_providers(document, embedding_service)
    when 'contextual_chunks'
      create_contextual_embeddings(document, embedding_service)
    when 'semantic_clustering'
      cluster_document_embeddings(document, embedding_service)
    end
  end

  private

  def enhance_with_multiple_providers(document, embedding_service)
    # Generate embeddings using different providers for comparison
    providers = ['openai', 'huggingface', 'cohere']

    providers.each do |provider|
      begin
        # Configure service for specific provider
        embedding_service.configure_provider(provider)

        # Generate embeddings
        embeddings = embedding_service.generate_embeddings_for_content(
          document.content,
          model: get_model_for_provider(provider)
        )

        # Store with provider-specific metadata
        store_provider_embeddings(document.id, provider, embeddings)

      rescue => e
        Rails.logger.warn "Failed to generate embeddings with #{provider}: #{e.message}" if defined?(Rails)
      end
    end
  end

  def create_contextual_embeddings(document, embedding_service)
    # Use text chunker for intelligent content splitting
    text_chunker = Ragdoll::TextChunker.new

    chunks = text_chunker.chunk_text(
      document.content,
      max_tokens: 512,
      overlap: 128,
      preserve_sentences: true
    )

    # Generate embeddings with surrounding context
    chunks.each_with_index do |chunk, index|
      # Add context from adjacent chunks
      contextual_content = build_contextual_content(chunks, index)

      embedding = embedding_service.generate_embedding(
        contextual_content,
        metadata: {
          chunk_index: index,
          has_context: true,
          context_size: contextual_content.length - chunk.length
        }
      )

      # Store contextual embedding
      store_contextual_embedding(document.id, chunk, embedding, index)
    end
  end
end

# Custom job using search engine
class DocumentSimilarityAnalysis < Ragdoll::CustomBaseJob
  def perform(document_id, similarity_threshold = 0.8)
    document = Ragdoll::Document.find(document_id)

    # Use core search engine
    embedding_service = Ragdoll::EmbeddingService.new
    search_engine = Ragdoll::SearchEngine.new(embedding_service)

    # Find similar documents
    similar_documents = search_engine.search_documents(
      document.content[0..1000], # Use first 1000 chars as query
      limit: 20,
      threshold: similarity_threshold,
      filters: {
        document_type: document.document_type
      }
    )

    # Analyze similarity patterns
    similarity_analysis = {
      total_similar: similar_documents.length,
      avg_similarity: calculate_average_similarity(similar_documents),
      similarity_clusters: group_by_similarity_ranges(similar_documents),
      related_keywords: extract_common_keywords(similar_documents)
    }

    # Update document metadata with similarity analysis
    update_document_status(document_id, document.status, {
      'similarity_analysis' => similarity_analysis,
      'similar_document_ids' => similar_documents.map { |d| d[:document_id] }
    })

    similarity_analysis
  end
end

Testing Approaches

Comprehensive testing strategies for custom jobs:

# Test helper for job testing
module JobTestHelpers
  def perform_job_now(job_class, *args)
    # Perform job synchronously for testing
    job_class.perform_now(*args)
  end

  def assert_job_enqueued(job_class, *args)
    assert_enqueued_with(job: job_class, args: args)
  end

  def create_test_document(attributes = {})
    default_attributes = {
      title: 'Test Document',
      location: '/tmp/test.txt',
      document_type: 'text',
      content: 'This is test content for processing.',
      status: 'pending',
      file_modified_at: Time.current
    }

    Ragdoll::Core::Ragdoll::Document.create!(default_attributes.merge(attributes))
  end
end

# Example test class
class AdvancedContentAnalysisTest < ActiveJob::TestCase
  include JobTestHelpers

  setup do
    @document = create_test_document(
      content: 'This is a comprehensive test document with substantial content for analysis.'
    )
  end

  test 'processes document successfully' do
    # Test synchronous execution
    result = perform_job_now(AdvancedContentAnalysis, @document.id)

    assert_not_nil result
    assert_includes result.keys, :sentiment_analysis
    assert_includes result.keys, :topic_modeling

    # Verify document status updated
    @document.reload
    assert_equal 'processed', @document.status
    assert_not_nil @document.metadata['advanced_analysis']
  end

  test 'handles missing document gracefully' do
    # Test with non-existent document ID
    assert_nothing_raised do
      perform_job_now(AdvancedContentAnalysis, 999999)
    end
  end

  test 'queues follow-up jobs for negative sentiment' do
    # Mock sentiment analysis to return negative score
    AdvancedContentAnalysis.any_instance.stubs(:perform_sentiment_analysis)
                           .returns({ score: -0.8, label: 'negative', confidence: 0.9 })

    assert_enqueued_with(job: ContentModerationJob) do
      perform_job_now(AdvancedContentAnalysis, @document.id)
    end
  end

  test 'handles service errors gracefully' do
    # Simulate service failure
    AdvancedContentAnalysis.any_instance.stubs(:perform_sentiment_analysis)
                           .raises(StandardError, 'Service unavailable')

    assert_raises(StandardError) do
      perform_job_now(AdvancedContentAnalysis, @document.id)
    end

    # Verify document marked as error
    @document.reload
    assert_equal 'error', @document.status
  end

  test 'processes with custom options' do
    options = { include_entities: true, deep_analysis: false }

    result = perform_job_now(AdvancedContentAnalysis, @document.id, options)

    @document.reload
    stored_options = @document.metadata['analysis_options']
    assert_equal options, stored_options
  end
end

# Integration testing with real services
class CustomJobIntegrationTest < ActiveJob::TestCase
  include JobTestHelpers

  setup do
    # Setup real service connections for integration testing
    @original_config = Ragdoll.config.dup

    Ragdoll::Core.configure do |config|
      config.ruby_llm_config[:openai][:api_key] = ENV['TEST_OPENAI_API_KEY']
      config.embedding_config[:text][:model] = 'text-embedding-3-small'
    end
  end

  teardown do
    # Restore original configuration
    Ragdoll.config = @original_config
  end

  test 'integration with real embedding service' do
    skip 'Requires API keys' unless ENV['TEST_OPENAI_API_KEY']

    document = create_test_document(
      content: 'Machine learning is transforming artificial intelligence applications.'
    )

    # This will make real API calls
    result = perform_job_now(CustomEmbeddingEnhancement, document.id, 'multi_provider')

    # Verify embeddings were created
    assert document.embeddings.count > 0

    # Verify embedding quality
    embedding = document.embeddings.first
    assert embedding.embedding_vector.present?
    assert_equal 1536, embedding.embedding_vector.length # OpenAI embedding size
  end
end

Performance Considerations

Optimization strategies for custom jobs:

# Performance-optimized job patterns
class PerformantCustomJob < Ragdoll::CustomBaseJob
  # Configuration for performance optimization
  queue_as :high_performance
  timeout 30.minutes

  def perform(document_ids, options = {})
    # Batch processing for efficiency
    documents = Ragdoll::Document.includes(:contents, :embeddings)
                               .where(id: document_ids)

    # Pre-load services to avoid repeated initialization
    services = initialize_services

    # Process in batches to manage memory
    documents.find_in_batches(batch_size: 50) do |batch|
      process_batch(batch, services, options)

      # Periodic garbage collection for long-running jobs
      GC.start if should_run_gc?
    end
  end

  private

  def initialize_services
    {
      embedding_service: Ragdoll::EmbeddingService.new,
      text_chunker: Ragdoll::TextChunker.new,
      search_engine: Ragdoll::SearchEngine.new(embedding_service)
    }
  end

  def process_batch(documents, services, options)
    # Batch database operations
    updates = []

    documents.each do |document|
      result = process_single_document(document, services, options)

      if result
        updates << {
          id: document.id,
          metadata: document.metadata.merge(result)
        }
      end
    end

    # Bulk update to reduce database round trips
    bulk_update_documents(updates) if updates.any?
  end

  def bulk_update_documents(updates)
    # Use raw SQL for efficient bulk updates
    updates.each_slice(100) do |batch|
      update_sql = build_bulk_update_sql(batch)
      ActiveRecord::Base.connection.execute(update_sql)
    end
  end

  def should_run_gc?
    # Run GC every 100 processed documents or when memory usage is high
    @processed_count ||= 0
    @processed_count += 1

    (@processed_count % 100 == 0) || (get_memory_usage > 500) # 500MB threshold
  end

  def get_memory_usage
    `ps -o rss= -p #{Process.pid}`.to_i / 1024 # MB
  end
end

# Memory-efficient streaming job
class StreamingProcessorJob < Ragdoll::CustomBaseJob
  def perform(large_document_id)
    document = Ragdoll::Document.find(large_document_id)

    # Process document in chunks to avoid loading entire content into memory
    process_in_chunks(document) do |chunk, chunk_index|
      # Process each chunk individually
      result = process_chunk(chunk, chunk_index)

      # Yield control to allow other jobs to run
      sleep(0.01) if chunk_index % 10 == 0

      result
    end
  end

  private

  def process_in_chunks(document)
    chunk_size = 1000 # characters
    content_length = document.content.length

    (0...content_length).step(chunk_size).each_with_index do |start, index|
      chunk = document.content[start, chunk_size]
      yield chunk, index
    end
  end
end

# Concurrent processing job (be careful with database connections)
class ConcurrentProcessingJob < Ragdoll::CustomBaseJob
  def perform(document_ids, max_concurrency: 5)
    require 'concurrent-ruby'

    # Use thread pool for concurrent processing
    pool = Concurrent::FixedThreadPool.new(max_concurrency)
    futures = []

    document_ids.each do |document_id|
      future = Concurrent::Future.execute(executor: pool) do
        # Each thread needs its own database connection
        ActiveRecord::Base.connection_pool.with_connection do
          process_single_document(document_id)
        end
      end

      futures << future
    end

    # Wait for all to complete
    results = futures.map(&:value)

    # Shutdown thread pool
    pool.shutdown
    pool.wait_for_termination(30) # 30 second timeout

    results
  rescue => e
    pool&.shutdown
    raise e
  end
end

Summary

Ragdoll's background job system provides a robust, scalable foundation for asynchronous document processing. Built on ActiveJob, the system offers:

Core Jobs: - ExtractText: File processing and content extraction - GenerateEmbeddings: Vector embedding generation for all content types - ExtractKeywords: AI-powered keyword extraction using LLMs - GenerateSummary: Document summarization with configurable length

Key Features: - Flexible Configuration: Support for all major queue adapters (Sidekiq, Resque, etc.) - Intelligent Retry Logic: Exponential backoff with error-specific retry policies - Comprehensive Monitoring: Job status tracking, performance metrics, and queue health - Error Handling: Dead letter queues, failure notifications, and recovery tools - Custom Job Support: Extensible framework for domain-specific processing

Production Ready: - Horizontal scaling with load balancing - Resource-aware job distribution - Automated alerting and escalation - Performance optimization patterns - Comprehensive testing approaches

The job system seamlessly integrates with Ragdoll's document processing pipeline, ensuring reliable and efficient handling of AI operations at scale.

This document is part of the Ragdoll documentation suite. For immediate help, see the Quick Start Guide or API Reference.