Skip to content

File Processing Guide

This guide demonstrates how to process files efficiently using SimpleFlow, including reading, writing, transforming, and validating file content.

Reading Files

Basic File Reading

step :read_file, ->(result) {
  begin
    filepath = result.value
    content = File.read(filepath)
    result.with_context(:content, content).continue(filepath)
  rescue Errno::ENOENT
    result.halt.with_error(:file, "File not found: #{filepath}")
  rescue Errno::EACCES
    result.halt.with_error(:file, "Permission denied: #{filepath}")
  end
}

Reading JSON Files

step :read_json, ->(result) {
  begin
    content = File.read(result.value)
    data = JSON.parse(content)
    result.continue(data)
  rescue JSON::ParserError => e
    result.halt.with_error(:parse, "Invalid JSON: #{e.message}")
  end
}

Reading CSV Files

step :read_csv, ->(result) {
  begin
    rows = CSV.read(result.value, headers: true)
    data = rows.map(&:to_h)
    result.continue(data)
  rescue CSV::MalformedCSVError => e
    result.halt.with_error(:parse, "Malformed CSV: #{e.message}")
  end
}

Reading YAML Files

step :read_yaml, ->(result) {
  begin
    data = YAML.load_file(result.value)
    result.continue(data)
  rescue Psych::SyntaxError => e
    result.halt.with_error(:parse, "Invalid YAML: #{e.message}")
  end
}

Writing Files

Writing Text Files

step :write_file, ->(result) {
  begin
    filepath = result.value[:path]
    content = result.value[:content]

    File.write(filepath, content)
    result.with_context(:bytes_written, content.bytesize).continue(filepath)
  rescue Errno::EACCES
    result.halt.with_error(:file, "Permission denied: #{filepath}")
  rescue Errno::ENOSPC
    result.halt.with_error(:file, "No space left on device")
  end
}

Writing JSON Files

step :write_json, ->(result) {
  filepath = result.value[:path]
  data = result.value[:data]

  json_content = JSON.pretty_generate(data)
  File.write(filepath, json_content)

  result.with_context(:path, filepath).continue(data)
}

Writing CSV Files

step :write_csv, ->(result) {
  filepath = result.value[:path]
  rows = result.value[:rows]

  CSV.open(filepath, 'w', write_headers: true, headers: rows.first.keys) do |csv|
    rows.each { |row| csv << row.values }
  end

  result.with_context(:rows_written, rows.size).continue(filepath)
}

Processing Large Files

Line-by-Line Processing

step :process_large_file, ->(result) {
  filepath = result.value
  processed = 0

  File.foreach(filepath) do |line|
    process_line(line.strip)
    processed += 1
  end

  result.with_context(:lines_processed, processed).continue(filepath)
}

Batch Processing

step :process_in_batches, ->(result) {
  filepath = result.value
  batch_size = 1000
  batches_processed = 0

  File.foreach(filepath).each_slice(batch_size) do |batch|
    # Process batch
    transformed = batch.map { |line| transform(line) }
    save_batch(transformed)
    batches_processed += 1
  end

  result.with_context(:batches_processed, batches_processed).continue(filepath)
}

Streaming Large Files

step :stream_process, ->(result) {
  input_path = result.value[:input]
  output_path = result.value[:output]

  File.open(output_path, 'w') do |output|
    File.foreach(input_path) do |line|
      transformed = transform_line(line)
      output.write(transformed)
    end
  end

  result.continue(output_path)
}

Multi-File Processing

Processing Multiple Files in Parallel

pipeline = SimpleFlow::Pipeline.new do
  step :process_config, ->(result) {
    config = JSON.parse(File.read('config/app.json'))
    result.with_context(:config, config).continue(result.value)
  }, depends_on: []

  step :process_users, ->(result) {
    users = CSV.read('data/users.csv', headers: true).map(&:to_h)
    result.with_context(:users, users).continue(result.value)
  }, depends_on: []

  step :process_logs, ->(result) {
    logs = File.readlines('logs/app.log').map(&:strip)
    result.with_context(:logs, logs).continue(result.value)
  }, depends_on: []

  step :combine_results, ->(result) {
    {
      config: result.context[:config],
      user_count: result.context[:users].size,
      log_count: result.context[:logs].size
    }
  }, depends_on: [:process_config, :process_users, :process_logs]
end

result = pipeline.call_parallel(SimpleFlow::Result.new(nil))

Directory Processing

step :process_directory, ->(result) {
  dir_path = result.value
  processed_files = []

  Dir.glob(File.join(dir_path, '*.json')).each do |filepath|
    data = JSON.parse(File.read(filepath))
    transformed = transform_data(data)
    processed_files << { file: filepath, records: transformed.size }
  end

  result.with_context(:processed_files, processed_files).continue(dir_path)
}

Data Transformation

CSV to JSON Conversion

pipeline = SimpleFlow::Pipeline.new do
  step :read_csv, ->(result) {
    rows = CSV.read(result.value, headers: true)
    result.continue(rows.map(&:to_h))
  }

  step :transform_data, ->(result) {
    transformed = result.value.map do |row|
      {
        id: row['id'].to_i,
        name: row['name'].strip,
        email: row['email'].downcase,
        active: row['active'] == 'true'
      }
    end
    result.continue(transformed)
  }

  step :write_json, ->(result) {
    output_path = result.value.first['source'] + '.json'
    File.write(output_path, JSON.pretty_generate(result.value))
    result.continue(output_path)
  }
end

File Format Conversion Pipeline

class FileConverter
  def self.build(input_format:, output_format:)
    SimpleFlow::Pipeline.new do
      step :read_input, reader_for(input_format), depends_on: []
      step :transform, ->(result) {
        # Normalize to common format
        result.continue(normalize_data(result.value))
      }, depends_on: [:read_input]
      step :write_output, writer_for(output_format), depends_on: [:transform]
    end
  end

  def self.reader_for(format)
    case format
    when :json then ->(result) { JSON.parse(File.read(result.value)) }
    when :csv then ->(result) { CSV.read(result.value, headers: true).map(&:to_h) }
    when :yaml then ->(result) { YAML.load_file(result.value) }
    end
  end

  def self.writer_for(format)
    case format
    when :json then ->(result) { File.write(result.value[:output], JSON.pretty_generate(result.value[:data])) }
    when :csv then ->(result) { write_csv(result.value[:output], result.value[:data]) }
    when :yaml then ->(result) { File.write(result.value[:output], result.value[:data].to_yaml) }
    end
  end
end

File Validation

Validating File Existence

step :validate_file_exists, ->(result) {
  filepath = result.value

  unless File.exist?(filepath)
    return result.halt.with_error(:file, "File does not exist: #{filepath}")
  end

  unless File.readable?(filepath)
    return result.halt.with_error(:file, "File is not readable: #{filepath}")
  end

  result.continue(filepath)
}

Validating File Format

step :validate_json_format, ->(result) {
  begin
    content = File.read(result.value)
    JSON.parse(content)  # Just validate, don't use result yet
    result.continue(result.value)
  rescue JSON::ParserError => e
    result.halt.with_error(:format, "Invalid JSON file: #{e.message}")
  end
}

Validating File Size

step :validate_file_size, ->(result) {
  filepath = result.value
  max_size = 10 * 1024 * 1024  # 10 MB

  file_size = File.size(filepath)

  if file_size > max_size
    result.halt.with_error(:size, "File too large: #{file_size} bytes (max #{max_size})")
  else
    result.with_context(:file_size, file_size).continue(filepath)
  end
}

Complete File Processing Example

class CSVProcessor
  def self.build
    SimpleFlow::Pipeline.new do
      # Validate file
      step :validate_exists, ->(result) {
        filepath = result.value
        unless File.exist?(filepath)
          return result.halt.with_error(:file, "File not found")
        end
        result.continue(filepath)
      }, depends_on: []

      step :validate_size, ->(result) {
        size = File.size(result.value)
        max_size = 50 * 1024 * 1024  # 50 MB

        if size > max_size
          return result.halt.with_error(:size, "File too large")
        end

        result.with_context(:file_size, size).continue(result.value)
      }, depends_on: [:validate_exists]

      # Read and parse
      step :read_csv, ->(result) {
        rows = CSV.read(result.value, headers: true)
        result.continue(rows.map(&:to_h))
      }, depends_on: [:validate_size]

      # Validate data
      step :validate_headers, ->(result) {
        required = ['id', 'name', 'email']
        actual = result.value.first.keys

        missing = required - actual
        if missing.any?
          return result.halt.with_error(:headers, "Missing columns: #{missing.join(', ')}")
        end

        result.continue(result.value)
      }, depends_on: [:read_csv]

      # Transform data
      step :clean_data, ->(result) {
        cleaned = result.value.map do |row|
          {
            id: row['id'].to_i,
            name: row['name'].strip.capitalize,
            email: row['email'].downcase.strip
          }
        end
        result.continue(cleaned)
      }, depends_on: [:validate_headers]

      step :filter_invalid, ->(result) {
        valid = result.value.select do |row|
          row[:email] =~ /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i
        end

        invalid_count = result.value.size - valid.size
        if invalid_count > 0
          result = result.with_context(:invalid_count, invalid_count)
        end

        result.continue(valid)
      }, depends_on: [:clean_data]

      # Save results
      step :write_output, ->(result) {
        output = 'output/cleaned.json'
        File.write(output, JSON.pretty_generate(result.value))

        result
          .with_context(:output_file, output)
          .with_context(:records_written, result.value.size)
          .continue(output)
      }, depends_on: [:filter_invalid]
    end
  end
end

# Usage
result = CSVProcessor.build.call(
  SimpleFlow::Result.new('data/users.csv')
)

if result.continue?
  puts "Processed successfully:"
  puts "  File size: #{result.context[:file_size]} bytes"
  puts "  Records written: #{result.context[:records_written]}"
  puts "  Invalid records skipped: #{result.context[:invalid_count] || 0}"
  puts "  Output: #{result.context[:output_file]}"
else
  puts "Processing failed:"
  result.errors.each do |category, messages|
    puts "  #{category}: #{messages.join(', ')}"
  end
end

Binary File Processing

Reading Binary Files

step :read_binary, ->(result) {
  filepath = result.value
  content = File.binread(filepath)

  result
    .with_context(:file_size, content.bytesize)
    .with_context(:encoding, content.encoding.name)
    .continue(content)
}

Processing Images

require 'mini_magick'

step :process_image, ->(result) {
  filepath = result.value

  image = MiniMagick::Image.open(filepath)

  # Resize if too large
  if image.width > 1920 || image.height > 1080
    image.resize '1920x1080'
  end

  # Generate thumbnail
  thumbnail = image.clone
  thumbnail.resize '200x200'

  result
    .with_context(:original_size, [image.width, image.height])
    .with_context(:thumbnail_path, filepath.gsub('.jpg', '_thumb.jpg'))
    .continue(filepath)
}

Temporary Files

Using Temporary Files

step :use_temp_file, ->(result) {
  require 'tempfile'

  Tempfile.create(['process', '.json']) do |temp|
    # Write intermediate data
    temp.write(JSON.generate(result.value))
    temp.rewind

    # Process temp file
    processed = process_file(temp.path)

    # Temp file automatically deleted when block exits
    result.continue(processed)
  end
}