Automated Data Pipeline — ACE / IDI

Replacing a 20-hour manual data cleaning process with a reproducible, validated Python ETL pipeline — reducing processing time by 70% and eliminating human error from health research workflows.

<span class="case-meta-label">Domain</span>
<span class="case-meta-value">Health Research / Data Engineering</span>
<span class="case-meta-label">Stack</span>
<span class="case-meta-value">Python · Pandas · SQL · PostgreSQL</span>
<span class="case-meta-label">Timeline</span>
<span class="case-meta-value">4 weeks</span>
<span class="case-meta-label">Status</span>
<span class="case-meta-value">✅ In production</span>

The Problem

The ACE/IDI research team collected health survey data from multiple field sites — arriving as inconsistent Excel files with:

  • Different column naming conventions per site
  • Missing values with no standardised encoding
  • Duplicate entries from data entry errors
  • Incompatible date formats across sources
  • No audit trail of transformations applied

Every data cycle, a team member spent ~20 hours manually merging and cleaning files in Excel. This was error-prone, not reproducible, and blocked downstream analysis.


Objectives

  1. Automate ingestion from multiple source files (Excel + CSV)
  2. Standardise column names, types, and encodings
  3. Detect and resolve duplicates with a configurable strategy
  4. Validate data against a schema before loading
  5. Produce an audit log of all transformations
  6. Load clean data to PostgreSQL for downstream analysis

Pipeline Architecture

[Source Files: Excel / CSV]
        ↓
[Extractor — multi-source reader]
        ↓
[Transformer — cleaning + validation]
   ├── Column standardisation
   ├── Type coercion
   ├── Missing value handling
   ├── Duplicate resolution
   └── Schema validation (Great Expectations)
        ↓
[Loader — PostgreSQL via SQLAlchemy]
        ↓
[Audit Log — JSON + summary report]

Implementation

Extractor

import pandas as pd
import glob
import os
from pathlib import Path

SITE_COLUMN_MAP = {
    "site_a": {"Participant_ID": "participant_id", "AGE": "age", "SEX": "sex"},
    "site_b": {"ID": "participant_id", "Age": "age", "Gender": "sex"},
    "site_c": {"pid": "participant_id", "age_years": "age", "sex_at_birth": "sex"},
}

def extract_sources(data_dir: str) -> pd.DataFrame:
    frames = []
    for filepath in glob.glob(f"{data_dir}/**/*.xlsx", recursive=True):
        site = detect_site(filepath)
        df = pd.read_excel(filepath)
        df = df.rename(columns=SITE_COLUMN_MAP[site])
        df["source_site"] = site
        df["source_file"] = Path(filepath).name
        frames.append(df)
        print(f"  Loaded {len(df)} rows from {Path(filepath).name}")

    combined = pd.concat(frames, ignore_index=True)
    print(f"\nTotal extracted: {len(combined)} rows from {len(frames)} files")
    return combined

Transformer

import numpy as np
from datetime import datetime

def transform(df: pd.DataFrame, audit_log: list) -> pd.DataFrame:
    original_count = len(df)

    # 1. Standardise column names
    df.columns = [c.lower().strip().replace(" ", "_") for c in df.columns]

    # 2. Type coercion
    df["age"] = pd.to_numeric(df["age"], errors="coerce")
    df["date_enrolled"] = pd.to_datetime(df["date_enrolled"], errors="coerce", dayfirst=True)

    # 3. Missing value strategy
    missing_before = df.isnull().sum().to_dict()
    df["age"].fillna(df.groupby("site")["age"].transform("median"), inplace=True)
    df = df.dropna(subset=["participant_id", "date_enrolled"])

    # 4. Duplicate resolution (keep first by enrollment date)
    dupes = df.duplicated(subset=["participant_id"], keep=False).sum()
    df = df.sort_values("date_enrolled").drop_duplicates(subset=["participant_id"], keep="first")

    # 5. Derived fields
    df["age_group"] = pd.cut(df["age"],
        bins=[0, 18, 35, 50, 65, 120],
        labels=["<18", "18–35", "35–50", "50–65", "65+"])

    audit_log.append({
        "step": "transform",
        "timestamp": datetime.utcnow().isoformat(),
        "rows_in": original_count,
        "rows_out": len(df),
        "duplicates_removed": dupes,
        "missing_before": missing_before,
    })

    print(f"Transform: {original_count}{len(df)} rows ({dupes} duplicates removed)")
    return df

Schema Validation with Great Expectations

import great_expectations as ge

def validate_schema(df: pd.DataFrame) -> bool:
    gdf = ge.from_pandas(df)

    results = gdf.expect_column_values_to_not_be_null("participant_id")
    results &= gdf.expect_column_values_to_be_between("age", min_value=0, max_value=120)
    results &= gdf.expect_column_values_to_be_in_set("sex", ["male", "female", "other", "unknown"])
    results &= gdf.expect_column_values_to_be_unique("participant_id")

    if not results["success"]:
        raise ValueError(f"Schema validation failed: {results['results']}")

    print("✓ Schema validation passed")
    return True

Loader

from sqlalchemy import create_engine
import json

def load_to_postgres(df: pd.DataFrame, audit_log: list, table: str = "participants"):
    engine = create_engine(os.getenv("DATABASE_URL"))

    df.to_sql(table, engine, if_exists="replace", index=False, method="multi", chunksize=500)

    # Write audit log
    with open(f"logs/audit_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "w") as f:
        json.dump(audit_log, f, indent=2, default=str)

    print(f"✓ Loaded {len(df)} rows to '{table}'")
    print(f"✓ Audit log saved")

Running the Pipeline

if __name__ == "__main__":
    audit_log = []

    raw = extract_sources("data/raw/")
    clean = transform(raw, audit_log)
    validate_schema(clean)
    load_to_postgres(clean, audit_log)

    print("\n Pipeline complete.")

Results

<div class="result-number">70%</div>
<div class="result-label">Time saved per cycle</div>
<div class="result-number">0</div>
<div class="result-label">Manual errors post-deploy</div>
<div class="result-number">6h</div>
<div class="result-label">Full cycle time (was 20h)</div>
<div class="result-number">100%</div>
<div class="result-label">Reproducible runs</div>

Lessons Learned

Site-specific column maps are worth the upfront effort. Hardcoding a mapping per data source felt tedious but prevented dozens of silent join errors that plagued the old Excel-based approach.

Audit logs are non-negotiable in research contexts. Researchers need to know exactly what transformations were applied and why. The JSON audit log made peer review possible.

Great Expectations caught 3 real data issues in the first production run — age values above 130 (data entry errors), and participant IDs with trailing whitespace that would have created false duplicates.


Future Improvements


View on GitHub ↗ ← All Projects