Class: FactDb::Pipeline::ExtractionPipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/fact_db/pipeline/extraction_pipeline.rb

Overview

Pipeline for extracting facts from sources using SimpleFlow Supports parallel processing of multiple source items

Examples:

Sequential extraction

pipeline = ExtractionPipeline.new(config)
results = pipeline.process([source1, source2], extractor: :llm)

Parallel extraction

pipeline = ExtractionPipeline.new(config)
results = pipeline.process_parallel([source1, source2, source3], extractor: :llm)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = FactDb.config) ⇒ ExtractionPipeline

Returns a new instance of ExtractionPipeline.



21
22
23
# File 'lib/fact_db/pipeline/extraction_pipeline.rb', line 21

def initialize(config = FactDb.config)
  @config = config
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



19
20
21
# File 'lib/fact_db/pipeline/extraction_pipeline.rb', line 19

def config
  @config
end

Instance Method Details

#process(sources, extractor: config.default_extractor) ⇒ Array<Hash>

Process multiple source items sequentially

Parameters:

  • sources (Array<Models::Source>)

    Source records to process

  • extractor (Symbol) (defaults to: config.default_extractor)

    Extractor type (:manual, :llm, :rule_based)

Returns:

  • (Array<Hash>)

    Results with extracted facts per source



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fact_db/pipeline/extraction_pipeline.rb', line 30

def process(sources, extractor: config.default_extractor)
  pipeline = build_extraction_pipeline(extractor)

  sources.map do |source|
    result = pipeline.call(SimpleFlow::Result.new(source))
    {
      source_id: source.id,
      facts: result.success? ? result.value : [],
      error: result.halted? ? result.error : nil
    }
  end
end

#process_parallel(sources, extractor: config.default_extractor) ⇒ Array<Hash>

Process multiple source items in parallel Uses SimpleFlow’s parallel execution capabilities

Parameters:

  • sources (Array<Models::Source>)

    Source records to process

  • extractor (Symbol) (defaults to: config.default_extractor)

    Extractor type (:manual, :llm, :rule_based)

Returns:

  • (Array<Hash>)

    Results with extracted facts per source



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fact_db/pipeline/extraction_pipeline.rb', line 49

def process_parallel(sources, extractor: config.default_extractor)
  pipeline = build_parallel_pipeline(sources, extractor)
  initial_result = SimpleFlow::Result.new(sources: sources, results: {})

  final_result = pipeline.call(initial_result)

  sources.map do |source|
    result = final_result.value[:results][source.id]
    {
      source_id: source.id,
      facts: result&.dig(:facts) || [],
      error: result&.dig(:error)
    }
  end
end