ExtractionPipeline¶
Concurrent fact extraction from multiple content items.
Class: FactDb::Pipeline::ExtractionPipeline¶
Methods¶
process¶
Process source items sequentially.
Parameters:
sources(Array) - Source records extractor(Symbol) - Extraction method
Returns: Array<Hash>
Example:
process_parallel¶
Process source items concurrently.
Parameters:
sources(Array) - Source records extractor(Symbol) - Extraction method
Returns: Array<Hash>
Example:
results = pipeline.process_parallel(sources, extractor: :llm)
results.each do |result|
puts "Source #{result[:source_id]}:"
puts " Facts: #{result[:facts].count}"
puts " Error: #{result[:error]}" if result[:error]
end
Pipeline Steps¶
Sequential Pipeline¶
graph LR
A[Source] --> B[Validate]
B --> C[Extract]
C --> D[Validate Facts]
D --> E[Results]
style A fill:#1E40AF,stroke:#1E3A8A,color:#FFFFFF
style B fill:#B45309,stroke:#92400E,color:#FFFFFF
style C fill:#047857,stroke:#065F46,color:#FFFFFF
style D fill:#B45309,stroke:#92400E,color:#FFFFFF
style E fill:#B91C1C,stroke:#991B1B,color:#FFFFFF
- Validate - Check source is not empty
- Extract - Run extractor
- Validate Facts - Filter valid facts
- Results - Return extracted facts
Parallel Pipeline¶
graph TB
subgraph Parallel
A1[Source 1] --> E1[Extract 1]
A2[Source 2] --> E2[Extract 2]
A3[Source 3] --> E3[Extract 3]
end
E1 --> Aggregate
E2 --> Aggregate
E3 --> Aggregate
style A1 fill:#1E40AF,stroke:#1E3A8A,color:#FFFFFF
style A2 fill:#1E40AF,stroke:#1E3A8A,color:#FFFFFF
style A3 fill:#1E40AF,stroke:#1E3A8A,color:#FFFFFF
style E1 fill:#047857,stroke:#065F46,color:#FFFFFF
style E2 fill:#047857,stroke:#065F46,color:#FFFFFF
style E3 fill:#047857,stroke:#065F46,color:#FFFFFF
style Aggregate fill:#B91C1C,stroke:#991B1B,color:#FFFFFF
Result Structure¶
{
source_id: 123,
facts: [<Fact>, <Fact>, ...], # Extracted facts
error: nil # Error message if failed
}
Usage via FactDb¶
facts = FactDb.new
# Sequential
results = facts.batch_extract(source_ids, parallel: false)
# Parallel (default)
results = facts.batch_extract(source_ids, parallel: true)
Error Handling¶
The pipeline catches errors per-item:
results = pipeline.process_parallel(sources)
results.each do |result|
if result[:error]
logger.error "Source #{result[:source_id]}: #{result[:error]}"
else
logger.info "Source #{result[:source_id]}: #{result[:facts].count} facts"
end
end
Performance¶
Batch Size¶
Optimal batch size depends on:
- Extractor type (LLM has rate limits)
- Source length
- System resources
# Process in optimal batches
sources.each_slice(25) do |batch|
results = pipeline.process_parallel(batch)
process_results(results)
end
Memory¶
For large batches, process and discard: