Case Study · Data Engineering · ETL · Python
Automated Data Pipeline — ACE / IDI
Replacing a 20-hour manual data cleaning process with a reproducible, validated Python ETL pipeline — reducing processing time by 70% and eliminating human error from health research workflows.
The Problem
The ACE/IDI research team collected health survey data from multiple field sites — arriving as inconsistent Excel files with:
- Different column naming conventions per site
- Missing values with no standardised encoding
- Duplicate entries from data entry errors
- Incompatible date formats across sources
- No audit trail of transformations applied
Every data cycle, a team member spent ~20 hours manually merging and cleaning files in Excel. This was error-prone, not reproducible, and blocked downstream analysis.
Objectives
- Automate ingestion from multiple source files (Excel + CSV)
- Standardise column names, types, and encodings
- Detect and resolve duplicates with a configurable strategy
- Validate data against a schema before loading
- Produce an audit log of all transformations
- Load clean data to PostgreSQL for downstream analysis
Pipeline Architecture
[Source Files: Excel / CSV]
↓
[Extractor — multi-source reader]
↓
[Transformer — cleaning + validation]
├── Column standardisation
├── Type coercion
├── Missing value handling
├── Duplicate resolution
└── Schema validation (Great Expectations)
↓
[Loader — PostgreSQL via SQLAlchemy]
↓
[Audit Log — JSON + summary report]
Implementation
Extractor
import pandas as pd
import glob
import os
from pathlib import Path
SITE_COLUMN_MAP = {
"site_a": {"Participant_ID": "participant_id", "AGE": "age", "SEX": "sex"},
"site_b": {"ID": "participant_id", "Age": "age", "Gender": "sex"},
"site_c": {"pid": "participant_id", "age_years": "age", "sex_at_birth": "sex"},
}
def extract_sources(data_dir: str) -> pd.DataFrame:
frames = []
for filepath in glob.glob(f"{data_dir}/**/*.xlsx", recursive=True):
site = detect_site(filepath)
df = pd.read_excel(filepath)
df = df.rename(columns=SITE_COLUMN_MAP[site])
df["source_site"] = site
df["source_file"] = Path(filepath).name
frames.append(df)
print(f" Loaded {len(df)} rows from {Path(filepath).name}")
combined = pd.concat(frames, ignore_index=True)
print(f"\nTotal extracted: {len(combined)} rows from {len(frames)} files")
return combinedTransformer
import numpy as np
from datetime import datetime
def transform(df: pd.DataFrame, audit_log: list) -> pd.DataFrame:
original_count = len(df)
# 1. Standardise column names
df.columns = [c.lower().strip().replace(" ", "_") for c in df.columns]
# 2. Type coercion
df["age"] = pd.to_numeric(df["age"], errors="coerce")
df["date_enrolled"] = pd.to_datetime(df["date_enrolled"], errors="coerce", dayfirst=True)
# 3. Missing value strategy
missing_before = df.isnull().sum().to_dict()
df["age"].fillna(df.groupby("site")["age"].transform("median"), inplace=True)
df = df.dropna(subset=["participant_id", "date_enrolled"])
# 4. Duplicate resolution (keep first by enrollment date)
dupes = df.duplicated(subset=["participant_id"], keep=False).sum()
df = df.sort_values("date_enrolled").drop_duplicates(subset=["participant_id"], keep="first")
# 5. Derived fields
df["age_group"] = pd.cut(df["age"],
bins=[0, 18, 35, 50, 65, 120],
labels=["<18", "18–35", "35–50", "50–65", "65+"])
audit_log.append({
"step": "transform",
"timestamp": datetime.utcnow().isoformat(),
"rows_in": original_count,
"rows_out": len(df),
"duplicates_removed": dupes,
"missing_before": missing_before,
})
print(f"Transform: {original_count} → {len(df)} rows ({dupes} duplicates removed)")
return dfSchema Validation with Great Expectations
import great_expectations as ge
def validate_schema(df: pd.DataFrame) -> bool:
gdf = ge.from_pandas(df)
results = gdf.expect_column_values_to_not_be_null("participant_id")
results &= gdf.expect_column_values_to_be_between("age", min_value=0, max_value=120)
results &= gdf.expect_column_values_to_be_in_set("sex", ["male", "female", "other", "unknown"])
results &= gdf.expect_column_values_to_be_unique("participant_id")
if not results["success"]:
raise ValueError(f"Schema validation failed: {results['results']}")
print("✓ Schema validation passed")
return TrueLoader
from sqlalchemy import create_engine
import json
def load_to_postgres(df: pd.DataFrame, audit_log: list, table: str = "participants"):
engine = create_engine(os.getenv("DATABASE_URL"))
df.to_sql(table, engine, if_exists="replace", index=False, method="multi", chunksize=500)
# Write audit log
with open(f"logs/audit_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "w") as f:
json.dump(audit_log, f, indent=2, default=str)
print(f"✓ Loaded {len(df)} rows to '{table}'")
print(f"✓ Audit log saved")Running the Pipeline
if __name__ == "__main__":
audit_log = []
raw = extract_sources("data/raw/")
clean = transform(raw, audit_log)
validate_schema(clean)
load_to_postgres(clean, audit_log)
print("\n Pipeline complete.")Results
<div class="result-number">70%</div>
<div class="result-label">Time saved per cycle</div>
<div class="result-number">0</div>
<div class="result-label">Manual errors post-deploy</div>
<div class="result-number">6h</div>
<div class="result-label">Full cycle time (was 20h)</div>
<div class="result-number">100%</div>
<div class="result-label">Reproducible runs</div>
Lessons Learned
Site-specific column maps are worth the upfront effort. Hardcoding a mapping per data source felt tedious but prevented dozens of silent join errors that plagued the old Excel-based approach.
Audit logs are non-negotiable in research contexts. Researchers need to know exactly what transformations were applied and why. The JSON audit log made peer review possible.
Great Expectations caught 3 real data issues in the first production run — age values above 130 (data entry errors), and participant IDs with trailing whitespace that would have created false duplicates.