Handout: Ohpen Data Engineer Case Study (2026)¶
Important Note on Scope¶
⚠️ Scope Disclaimer: This solution is based on the case study requirements and my interpretation of the problem. I have made assumptions that may underestimate the current scope of operations at Ohpen. The implementation would need refinement based on actual production requirements, volumes, compliance needs, and operational constraints.
Core narrative (use this line everywhere)¶
I assume an append-only raw layer and deterministic transformations, so curated outputs can be reproduced or recomputed for historical periods when necessary, while keeping raw inputs immutable.
High-level pipeline diagram¶
S3 Bronze] Raw --> Glue[AWS Glue Job
PySpark ETL] Glue --> Proc[Processed_Parquet
S3 Silver] Glue --> Quar[Quarantine_InvalidRows
S3 Quarantine] Glue --> Cond[Condemned_Layer
Max Attempts/Duplicates
7-Year Retention] Proc --> Cat[Glue Data Catalog
Table Metadata] Cat --> Athena[Amazon Athena
SQL Queries] Athena --> Cons[Consumers
BI/Analytics] style Src fill:#f57c00,color:#fff style Raw fill:#f57c00,color:#fff style Glue fill:#7b1fa2,color:#fff style Proc fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style Quar fill:#d32f2f,color:#fff style Cond fill:#8b0000,color:#fff style Cat fill:#424242,color:#fff style Athena fill:#388e3c,color:#fff style Cons fill:#1976d2,color:#fff
Orchestration architecture¶
Daily 2 AM UTC] -->|Triggers| StepFunc[Step Functions
Orchestration] StepFunc -->|Invokes with Retry
3 attempts, exponential backoff| GlueJob[AWS Glue Job
PySpark ETL] GlueJob -->|Writes| S3[S3 Layers
Bronze/Silver/Quarantine/Condemned] StepFunc -->|Publishes| CloudWatch[CloudWatch
Metrics & Logs] CloudWatch -->|Alerts| Alerts[Alerts
Job Failure, Quarantine Spike] style EventBridge fill:#5c6bc0,color:#fff style StepFunc fill:#7b1fa2,color:#fff style GlueJob fill:#388e3c,color:#fff style S3 fill:#1976d2,color:#fff style CloudWatch fill:#ffa000,color:#111 style Alerts fill:#d32f2f,color:#fff
Orchestration Flow: EventBridge schedules daily runs (2 AM UTC), Step Functions orchestrates with automatic retry (3 attempts, exponential backoff), and Glue executes the PySpark ETL job. Step Functions handles error recovery and publishes metrics to CloudWatch.
What’s implemented (Task 1–5)¶
- Task 1 (ETL):
- Pandas implementation:
tasks/01_data_ingestion_transformation/src/etl/ingest_transactions.py(original) - PySpark implementation:
tasks/01_data_ingestion_transformation/src/etl/ingest_transactions_spark.py(recommended for production) - Executes on AWS Glue (provisioned via Terraform, orchestrated by Step Functions)
- Reads CSV from S3 Bronze layer
- Validates required fields + currency allowlist + timestamp parsing
- Optional TransactionID deduplication via Silver layer scanning
- Writes Parquet partitioned by year/month, and isolates each run under
run_id=... - Writes invalid records to quarantine with
validation_error - Writes condemned records (max attempts ≥3 or exact duplicates) to condemned layer with 7-year retention
- Writes
_SUCCESSwith run metrics - Registers tables in Glue Data Catalog for Athena queries
- Task 2 (Architecture):
tasks/02_data_lake_architecture_design/architecture.md+diagram.mmd - Raw / Processed / Aggregates + Quarantine layout.
- Schema evolution strategy: Iceberg-first, Parquet fallback.
- Task 3 (SQL):
tasks/03_sql/balance_history_2024_q1.sql - Month-end “as-of” balances for Jan–Mar 2024.
- Generates account×month spine and left-joins, so missing months stay
NULL(as in example). - Task 4 (CI/CD + IaC):
tasks/04_devops_cicd/ - CI workflow:
.github/workflows/ci.yml - CI/CD write-up:
cicd_workflow.md - Terraform:
infra/terraform/main.tf(includes EventBridge, Step Functions, Glue, S3 lifecycle policies) - Task 5 (Comms + Docs):
tasks/05_communication_documentation/ - Stakeholder email template + one-page technical summary.
How to demo quickly (local)¶
From projects/ohpen-case-2026/:
1) Set up AWS credentials and run ETL:
source .venv/bin/activate
export AWS_ACCESS_KEY_ID=<your-aws-access-key>
export AWS_SECRET_ACCESS_KEY=<your-aws-secret-key>
export AWS_REGION=us-east-1
python3 tasks/01_data_ingestion_transformation/src/etl/ingest_transactions.py \
--input-bucket ohpen-raw \
--input-key transactions/transactions.csv \
--output-bucket ohpen-processed \
--output-prefix transactions \
--quarantine-bucket ohpen-quarantine \
--quarantine-prefix transactions Expected: valid rows written under processed/transactions/run_id=.../year=YYYY/month=MM/ and invalid rows under quarantine/.../run_id=.../.
Monitoring & Observability (per task)¶
Task 1 (ETL job)¶
- Metrics to emit (structured logs + CloudWatch/Datadog):
run_id,input_rows,valid_rows,quarantined_rows,condemned_rowsduration_seconds,bytes_read,bytes_writtenquarantine_by_reason(e.g.,Invalid Currency,Missing required fields,Invalid Timestamp)duplicate_detection_rate,auto_condemnation_rate,circuit_breaker_triggers- Alerts:
- Job failure (non-zero exit, no
_SUCCESS) - handled by Step Functions with automatic retry - Quarantine rate spike (e.g.,
quarantined_rows / input_rows > 1%) - Volume anomaly (too few or too many rows vs baseline)
- Circuit breaker triggered (>100 same errors/hour → automatic pipeline halt)
- Run completeness:
_SUCCESSmarker with metrics JSON is the "commit" signal for consumers.- Optional (describe-only):
manifest.jsonlisting files + counts + schema hash.
Task 2 (Data Lake health)¶
- Storage & access:
- Bucket versioning, lifecycle policies (raw retention vs processed retention).
- S3 4xx/5xx error rates, request latency, throttling.
- Data governance checks (automated):
- “No public access” enforcement.
- Tagging compliance (env, owner, cost-center).
- Glue Catalog drift (expected partitions present, table schema matches expectations).
Task 3 (SQL / Athena usage)¶
- Cost & performance:
- Track Athena “bytes scanned” per query (FinOps).
- Track query runtime and failure rate (operational reliability).
- Quality signals:
- Monitor for sudden increase in
NULLmonth-ends (may indicate missing ingestion for a period).
Task 4 (CI/CD + deployments)¶
- CI health:
- Test pass rate, time-to-green, lint failures.
- PR lead time / deployment frequency (DORA-style).
- Release safety:
- Artifact versioning (git SHA tag) so backfills can run with a known version.
- Terraform plan/apply drift detection (fail pipeline on unexpected infra drift).
Task 5 (Communication & documentation)¶
- Operational cadence:
- Scheduled stakeholder updates for key pipelines (monthly/weekly).
- Post-incident comms template for failures or data corrections.
- Doc freshness:
- Treat the one-pager as living documentation; update on schema changes or new consumers.
Interview Q&A (short, practical answers)¶
“Why run_id partitions?”¶
To make runs reproducible and rerunnable (including historical periods) without overwriting previous outputs; it also makes debugging and comparisons between runs straightforward.
“How do you do backfills safely?”¶
Reprocess the relevant raw partition(s), write a new run_id, validate counts, then publish the corrected dataset (catalog pointer/view, or Iceberg snapshot).
“How do you prevent breaking schema changes?”¶
Treat schema as a contract: default to additive + optional changes, deprecate before removal, and avoid semantic drift (a field keeping the same name but changing meaning). For analytics, Iceberg + Glue Catalog gives controlled evolution; for integrations/events, enforce compatibility rules at the boundary (validation + versioning discipline).
“How do you do zero-downtime schema evolution with long retention/replay?”¶
Assume old data will be replayed years later, so new fields must remain optional and consumers must tolerate missing fields. For breaking changes, use either:
- Dual-read/dual-logic via a version flag (rolling migration), or
- Versioned datasets/streams (v2) + backfill/migrate + retire v1 when semantics/security require it.
“What do you do for security-driven breaking changes (e.g., encryption)?”¶
If old data must not remain, do a controlled migration: re-encrypt/reprocess into a v2 dataset/topic, cut consumers over, then producers, then delete/retire v1. If dual-support is acceptable short-term, use an encryption_version flag so consumers can decrypt both during rollout.
“When do you move from Python to Spark/Glue?”¶
Both implementations are provided: Pandas for development/testing, PySpark (recommended for production) for distributed processing. PySpark offers vectorized operations, broadcast joins, and is optimized for AWS Glue execution. The interface stays the same; only the execution engine changes.
"What is the condemned layer?"¶
Rows exceeding max attempts (≥3) or exact duplicates are moved to the condemned layer with 7-year retention for compliance. Condemned data transitions to Glacier after 5 years and is deleted after 7 years (with human approval required). This prevents infinite retry loops and maintains audit trail for compliance.
"How does TransactionID deduplication work?"¶
Optional feature: Before processing, scan existing Silver layer Parquet files to extract TransactionID + event_date combinations. If incoming transaction matches, auto-condemn to prevent duplicate processing across runs. First run has no Silver layer, so feature is disabled; subsequent runs enable it for duplicate prevention.
"Why DuckDB for SQL tests instead of a real database?"¶
For the case study, DuckDB validates syntax and query logic in milliseconds without external dependencies. For production, we'd add integration tests against Aurora/RDS replicas to catch engine-specific behavior.
"Why not Testcontainers for integration tests?"¶
Current tests use temp files + mocking (fast, simple, validates core logic). Testcontainers would be the next layer for production: testing S3 API edge cases, multipart uploads, real Glue workflows. Trade-off: speed vs realism—case study prioritizes the former, production needs both.