Skip to content

Data Fetching Guide

This guide demonstrates how to fetch data from various sources using SimpleFlow, including APIs, databases, file systems, and external services.

API Data Fetching

Basic API Call

step :fetch_from_api, ->(result) {
  begin
    response = HTTP.get("https://api.example.com/users/#{result.value}")
    data = JSON.parse(response.body)
    result.with_context(:user_data, data).continue(result.value)
  rescue HTTP::Error => e
    result.halt.with_error(:api, "API request failed: #{e.message}")
  rescue JSON::ParserError => e
    result.halt.with_error(:parse, "Invalid JSON: #{e.message}")
  end
}

Parallel API Calls

pipeline = SimpleFlow::Pipeline.new do
  step :fetch_weather, ->(result) {
    location = result.value[:location]
    weather = HTTP.get("https://api.weather.com/current?location=#{location}").parse
    result.with_context(:weather, weather).continue(result.value)
  }, depends_on: []

  step :fetch_news, ->(result) {
    topic = result.value[:topic]
    news = HTTP.get("https://api.news.com/articles?topic=#{topic}").parse
    result.with_context(:news, news).continue(result.value)
  }, depends_on: []

  step :fetch_stocks, ->(result) {
    symbols = result.value[:symbols]
    stocks = HTTP.get("https://api.stocks.com/quotes?symbols=#{symbols}").parse
    result.with_context(:stocks, stocks).continue(result.value)
  }, depends_on: []

  step :combine_results, ->(result) {
    combined = {
      weather: result.context[:weather],
      news: result.context[:news],
      stocks: result.context[:stocks]
    }
    result.continue(combined)
  }, depends_on: [:fetch_weather, :fetch_news, :fetch_stocks]
end

# All API calls execute in parallel
result = pipeline.call_parallel(
  SimpleFlow::Result.new({ location: "NYC", topic: "tech", symbols: "AAPL,GOOGL" })
)

API with Authentication

class AuthenticatedAPI
  def initialize(api_key)
    @api_key = api_key
  end

  def call(result)
    endpoint = result.value[:endpoint]

    response = HTTP
      .auth("Bearer #{@api_key}")
      .get("https://api.example.com/#{endpoint}")

    if response.status.success?
      data = JSON.parse(response.body)
      result.with_context(:api_response, data).continue(result.value)
    else
      result.halt.with_error(:api, "Request failed with status #{response.status}")
    end
  rescue StandardError => e
    result.halt.with_error(:api, "API error: #{e.message}")
  end
end

pipeline = SimpleFlow::Pipeline.new do
  step :fetch_data, AuthenticatedAPI.new(ENV['API_KEY']), depends_on: []
end

Rate-Limited API Calls

class RateLimitedFetcher
  def initialize(max_requests_per_second: 10)
    @max_requests = max_requests_per_second
    @request_times = []
  end

  def call(result)
    wait_if_rate_limited

    begin
      @request_times << Time.now
      response = HTTP.get(result.value[:url])
      data = response.parse

      result.with_context(:data, data).continue(result.value)
    rescue HTTP::Error => e
      result.halt.with_error(:http, e.message)
    end
  end

  private

  def wait_if_rate_limited
    # Remove old requests outside the time window
    one_second_ago = Time.now - 1
    @request_times.reject! { |time| time < one_second_ago }

    # Wait if we've hit the limit
    if @request_times.size >= @max_requests
      sleep(0.1)
      wait_if_rate_limited
    end
  end
end

Database Queries

Basic Database Query

step :fetch_users, ->(result) {
  users = DB[:users]
    .where(active: true)
    .where { created_at > Date.today - 30 }
    .all

  result.with_context(:users, users).continue(result.value)
}

Parallel Database Queries

pipeline = SimpleFlow::Pipeline.new do
  step :fetch_users, ->(result) {
    users = DB[:users].where(active: true).all
    result.with_context(:users, users).continue(result.value)
  }, depends_on: []

  step :fetch_orders, ->(result) {
    orders = DB[:orders].where(status: 'completed').all
    result.with_context(:orders, orders).continue(result.value)
  }, depends_on: []

  step :fetch_products, ->(result) {
    products = DB[:products].where(in_stock: true).all
    result.with_context(:products, products).continue(result.value)
  }, depends_on: []

  step :aggregate, ->(result) {
    stats = {
      total_users: result.context[:users].size,
      total_orders: result.context[:orders].size,
      total_products: result.context[:products].size
    }
    result.continue(stats)
  }, depends_on: [:fetch_users, :fetch_orders, :fetch_products]
end

# Ensure your database connection pool supports concurrent queries
DB = Sequel.connect(
  'postgres://localhost/mydb',
  max_connections: 10  # Allow concurrent connections
)

result = pipeline.call_parallel(SimpleFlow::Result.new(nil))

Complex Joins and Aggregations

step :fetch_user_analytics, ->(result) {
  user_id = result.value

  analytics = DB[:users]
    .select(:users__id, :users__name)
    .select_append { count(:orders__id).as(:order_count) }
    .select_append { sum(:orders__total).as(:total_spent) }
    .left_join(:orders, user_id: :id)
    .where(users__id: user_id)
    .group(:users__id, :users__name)
    .first

  result.with_context(:analytics, analytics).continue(result.value)
}

File System Operations

Reading Files

step :read_config, ->(result) {
  begin
    config_path = result.value[:config_path]
    content = File.read(config_path)
    config = JSON.parse(content)

    result.with_context(:config, config).continue(result.value)
  rescue Errno::ENOENT
    result.halt.with_error(:file, "Config file not found: #{config_path}")
  rescue JSON::ParserError => e
    result.halt.with_error(:parse, "Invalid JSON in config: #{e.message}")
  end
}

Reading Multiple Files in Parallel

pipeline = SimpleFlow::Pipeline.new do
  step :read_users_csv, ->(result) {
    users = CSV.read('data/users.csv', headers: true).map(&:to_h)
    result.with_context(:users, users).continue(result.value)
  }, depends_on: []

  step :read_products_json, ->(result) {
    products = JSON.parse(File.read('data/products.json'))
    result.with_context(:products, products).continue(result.value)
  }, depends_on: []

  step :read_config_yaml, ->(result) {
    config = YAML.load_file('config/settings.yml')
    result.with_context(:config, config).continue(result.value)
  }, depends_on: []

  step :combine_data, ->(result) {
    combined = {
      users: result.context[:users],
      products: result.context[:products],
      config: result.context[:config]
    }
    result.continue(combined)
  }, depends_on: [:read_users_csv, :read_products_json, :read_config_yaml]
end

Processing Large Files

step :process_large_file, ->(result) {
  file_path = result.value
  processed_count = 0

  File.foreach(file_path).each_slice(1000) do |batch|
    # Process in batches
    batch.each do |line|
      process_line(line)
      processed_count += 1
    end
  end

  result.with_context(:lines_processed, processed_count).continue(result.value)
}

Caching Strategies

Simple Cache with Fallback

step :fetch_with_cache, ->(result) {
  cache_key = "user_#{result.value}"

  # Try cache first
  cached = REDIS.get(cache_key)
  if cached
    data = JSON.parse(cached)
    return result.with_context(:source, :cache).continue(data)
  end

  # Cache miss - fetch from API
  begin
    response = HTTP.get("https://api.example.com/users/#{result.value}")
    data = response.parse

    # Store in cache for 1 hour
    REDIS.setex(cache_key, 3600, data.to_json)

    result.with_context(:source, :api).continue(data)
  rescue HTTP::Error => e
    result.halt.with_error(:fetch, "Failed to fetch data: #{e.message}")
  end
}

Multi-Level Caching

class MultiLevelCache
  def self.call(result)
    key = result.value[:cache_key]

    # Level 1: Memory cache
    if data = MEMORY_CACHE[key]
      return result.with_context(:cache_level, :memory).continue(data)
    end

    # Level 2: Redis cache
    if cached = REDIS.get(key)
      data = JSON.parse(cached)
      MEMORY_CACHE[key] = data
      return result.with_context(:cache_level, :redis).continue(data)
    end

    # Level 3: Database
    if record = DB[:cache].where(key: key).first
      data = JSON.parse(record[:value])
      REDIS.setex(key, 3600, data.to_json)
      MEMORY_CACHE[key] = data
      return result.with_context(:cache_level, :database).continue(data)
    end

    # No cache hit - need to fetch
    result.with_context(:cache_level, :none).continue(nil)
  end
end

Batch Processing

Fetching Data in Batches

pipeline = SimpleFlow::Pipeline.new do
  step :fetch_batch, ->(result) {
    batch_ids = result.value
    records = DB[:records].where(id: batch_ids).all

    result.with_context(:records, records).continue(result.value)
  }

  step :process_records, ->(result) {
    records = result.context[:records]
    processed = records.map { |r| transform_record(r) }

    result.continue(processed)
  }
end

# Process in batches
all_ids = (1..10000).to_a
all_ids.each_slice(100) do |batch|
  result = pipeline.call(SimpleFlow::Result.new(batch))
  save_processed_batch(result.value)
end

Real-World ETL Example

class ETLPipeline
  def self.build
    SimpleFlow::Pipeline.new do
      # Extract phase - parallel data loading
      step :extract_users, ->(result) {
        users = CSV.read('data/users.csv', headers: true).map(&:to_h)
        result.with_context(:raw_users, users).continue(result.value)
      }, depends_on: []

      step :extract_orders, ->(result) {
        orders = JSON.parse(File.read('data/orders.json'))
        result.with_context(:raw_orders, orders).continue(result.value)
      }, depends_on: []

      step :extract_products, ->(result) {
        products = DB[:products].all
        result.with_context(:raw_products, products).continue(result.value)
      }, depends_on: []

      # Transform phase - parallel transformations
      step :transform_users, ->(result) {
        users = result.context[:raw_users].map do |user|
          {
            id: user['id'].to_i,
            name: user['name'].strip.downcase,
            email: user['email'].downcase,
            created_at: Date.parse(user['signup_date'])
          }
        end
        result.with_context(:users, users).continue(result.value)
      }, depends_on: [:extract_users]

      step :transform_orders, ->(result) {
        orders = result.context[:raw_orders]
          .reject { |o| o['status'] == 'cancelled' }
          .map do |order|
            {
              id: order['order_id'],
              user_id: order['user_id'],
              total: order['amount'].to_f,
              items: order['items'].size
            }
          end
        result.with_context(:orders, orders).continue(result.value)
      }, depends_on: [:extract_orders]

      # Load phase - aggregate and save
      step :aggregate_stats, ->(result) {
        users = result.context[:users]
        orders = result.context[:orders]

        stats = users.map do |user|
          user_orders = orders.select { |o| o[:user_id] == user[:id] }
          {
            user_id: user[:id],
            total_orders: user_orders.size,
            total_spent: user_orders.sum { |o| o[:total] },
            avg_order: user_orders.empty? ? 0 : user_orders.sum { |o| o[:total] } / user_orders.size
          }
        end

        result.continue(stats)
      }, depends_on: [:transform_users, :transform_orders]

      step :save_results, ->(result) {
        DB[:user_stats].multi_insert(result.value)
        result.continue("Saved #{result.value.size} records")
      }, depends_on: [:aggregate_stats]
    end
  end
end

# Execute ETL pipeline
result = ETLPipeline.build.call_parallel(SimpleFlow::Result.new(nil))
puts result.value  # "Saved 150 records"

Error Handling for Data Fetching

step :fetch_with_retries, ->(result) {
  max_retries = 3
  attempt = 0

  begin
    attempt += 1
    response = HTTP.timeout(10).get(result.value[:url])
    data = response.parse

    result
      .with_context(:attempts, attempt)
      .with_context(:data, data)
      .continue(result.value)
  rescue HTTP::TimeoutError
    if attempt < max_retries
      sleep(attempt ** 2)  # Exponential backoff
      retry
    else
      result.halt.with_error(:timeout, "Request timed out after #{max_retries} attempts")
    end
  rescue HTTP::Error => e
    result.halt.with_error(:http, "HTTP error: #{e.message}")
  end
}