Advanced Code Example — Files, APIs, and Data Ingestion#

This example demonstrates reading and writing CSV and JSON files, simulating an API response pattern, and building a validated ingestion pipeline that handles real-world data quality issues.


Business Scenario#

You are building a customer data ingestion pipeline that:

  1. Reads raw customer data from a CSV file
  2. Validates and enriches each record
  3. Saves processed data as JSON for downstream analysis
  4. Simulates pulling supplemental data from an API
  5. Merges and reports on the combined dataset

Code#

import json
import csv
import io

# ── Simulated CSV Source Data ─────────────────────────────────────────
# (In real use, you'd do: with open('customers.csv', 'r') as f: reader = csv.DictReader(f))
raw_csv = """name,customer_id,region,total_spent,purchase_count,is_premium
Alice Johnson,1001,Northwest,1257.30,12,True
Bob Martinez,1002,Southwest,N/A,4,False
Carol Chen,1003,Northwest,890.75,9,True
David Kim,1004,Southeast,,2,False
Eve Torres,1005,Southwest,1450.00,15,True
"""

# ── Simulated API Response ────────────────────────────────────────────
# (In real use: import requests; response = requests.get(url); data = response.json())
api_response = {
    "status": "success",
    "customers": [
        {"customer_id": 1001, "loyalty_score": 92, "churn_risk": "low"},
        {"customer_id": 1002, "loyalty_score": 41, "churn_risk": "high"},
        {"customer_id": 1003, "loyalty_score": 78, "churn_risk": "medium"},
        {"customer_id": 1005, "loyalty_score": 95, "churn_risk": "low"},
    ]
}

# ── Ingestion: Read + Validate CSV ────────────────────────────────────
def parse_customer_csv(raw_text: str) -> tuple[list, list]:
    """Parse CSV text into validated customer records."""
    reader = csv.DictReader(io.StringIO(raw_text))
    valid_records = []
    errors = []

    for row in reader:
        name = row.get('name', 'UNKNOWN').strip()
        try:
            customer_id = int(row['customer_id'])
            total_spent = float(row['total_spent'])  # raises ValueError if empty or 'N/A'
            purchase_count = int(row['purchase_count'])
            is_premium = row['is_premium'].strip().lower() == 'true'

            if total_spent < 0:
                raise ValueError(f"Negative total_spent: {total_spent}")

            valid_records.append({
                'name': name,
                'customer_id': customer_id,
                'region': row['region'].strip(),
                'total_spent': round(total_spent, 2),
                'purchase_count': purchase_count,
                'is_premium': is_premium
            })
        except (ValueError, KeyError) as e:
            errors.append({'name': name, 'error': str(e)})

    return valid_records, errors


# ── Enrich with API Data ──────────────────────────────────────────────
def enrich_with_api(customers: list, api_data: dict) -> list:
    """Merge API loyalty/churn data into customer records."""
    # Build a lookup dict from API response
    api_lookup = {c['customer_id']: c for c in api_data.get('customers', [])}

    enriched = []
    for customer in customers:
        cid = customer['customer_id']
        api_info = api_lookup.get(cid, {})
        enriched.append({
            **customer,  # unpack existing fields
            'loyalty_score': api_info.get('loyalty_score', None),
            'churn_risk': api_info.get('churn_risk', 'unknown')
        })
    return enriched


# ── Save to JSON ──────────────────────────────────────────────────────
def save_to_json(data: list, filepath: str) -> None:
    """Save processed records to a JSON file."""
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2)
    print(f"  Saved {len(data)} records to {filepath}")


# ── Load from JSON ────────────────────────────────────────────────────
def load_from_json(filepath: str) -> list:
    """Load records from a JSON file."""
    with open(filepath, 'r') as f:
        return json.load(f)


# ══════════════════════════════════════════════════════════
#  PIPELINE EXECUTION
# ══════════════════════════════════════════════════════════
print("=" * 58)
print("  DATA INGESTION PIPELINE")
print("=" * 58)

# Step 1: Parse CSV
customers, errors = parse_customer_csv(raw_csv)
print(f"\n  Step 1 — CSV Parsing:")
print(f"    Valid records  : {len(customers)}")
print(f"    Errors skipped : {len(errors)}")
for err in errors:
    print(f"    ✗ {err['name']}: {err['error']}")

# Step 2: Enrich with API
enriched = enrich_with_api(customers, api_response)
print(f"\n  Step 2 — API Enrichment:")
print(f"    Records enriched: {len(enriched)}")

# Step 3: Save to JSON
save_to_json(enriched, '/tmp/customers_processed.json')

# Step 4: Load and display
loaded = load_from_json('/tmp/customers_processed.json')
print(f"\n  Step 4 — Loaded from JSON ({len(loaded)} records):")
print()
print(f"  {'Name':<20} {'Region':<12} {'Spent':>10}  {'Churn':>8}  {'Loyalty':>8}")
print(f"  {'-'*20} {'-'*12} {'-'*10}  {'-'*8}  {'-'*8}")
for c in loaded:
    loyalty = f"{c['loyalty_score']}/100" if c['loyalty_score'] else 'N/A'
    print(f"  {c['name']:<20} {c['region']:<12} ${c['total_spent']:>9,.2f}  {c['churn_risk']:>8}  {loyalty:>8}")

print("=" * 58)

Expected Output#

==========================================================
  DATA INGESTION PIPELINE
==========================================================

  Step 1 — CSV Parsing:
    Valid records  : 3
    Errors skipped : 2
    ✗ Bob Martinez: could not convert string to float: 'N/A'
    ✗ David Kim: could not convert string to float: ''

  Step 2 — API Enrichment:
    Records enriched: 3

  Saved 3 records to /tmp/customers_processed.json

  Step 4 — Loaded from JSON (3 records):

  Name                 Region       Spent     Churn   Loyalty
  -------------------- ------------ ----------  --------  --------
  Alice Johnson        Northwest    $1,257.30       low    92/100
  Carol Chen           Northwest      $890.75    medium    78/100
  Eve Torres           Southwest    $1,450.00       low    95/100
==========================================================

Key Concepts Demonstrated#

Concept Where in Code
csv.DictReader Row-as-dictionary CSV reading
io.StringIO Reading string as file object
with open(...) context manager File read and write
json.dump / json.load Serializing/deserializing JSON
Dict comprehension for API lookup {c['customer_id']: c for c in ...}
**customer dict unpacking Merging records in enrich_with_api()
.get() with fallback api_info.get('loyalty_score', None)

Next: Jupyter Notebook →