Task 1 (Data Ingestion & Transformation) — Assumptions & Edge Cases¶
Important Note on Scope¶
⚠️ Scope Disclaimer: The assumptions and edge case handling described in this document are based on the case study requirements and my interpretation of the problem. I have made assumptions that may underestimate the current scope of operations at Ohpen. These assumptions would need validation with the team to ensure they align with:
- Actual data volumes and transaction patterns
- Real-world error rates and data quality issues
- Production compliance and regulatory requirements
- Existing operational processes and constraints
This document demonstrates analytical thinking and edge case consideration, but would require refinement based on production realities.
This document captures the explicit assumptions and edge cases considered for the Task 1 ETL: CSV from S3 → validate → Parquet partitioned by time, plus quarantine for invalid rows.
ETL Flow Overview¶
Bronze Layer] --> Validate[Validation
Required Fields
Currency
Timestamp] Validate -->|Valid| Parquet[Parquet Output
Silver Layer
Partitioned by Year/Month] Validate -->|Invalid| Quarantine[Quarantine Layer
With Error Details] Parquet --> Success[_SUCCESS Marker
Run Metrics] style CSV fill:#f57c00,color:#fff style Validate fill:#7b1fa2,color:#fff style Parquet fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style Quarantine fill:#d32f2f,color:#fff style Success fill:#757575,color:#fff
What the ETL does (scope)¶
- Reads CSV from Bronze layer (raw, immutable S3 storage).
- Metadata Enrichment: Adds tracking metadata to all rows (
row_hash,source_file_id,attempt_count,ingestion_timestamp). - Loop Prevention: Checks quarantine history for duplicates, enforces attempt limits (max 3), implements circuit breaker.
- Validates rows:
- required fields present
- currency allowlist (ISO-4217)
- timestamp parseable
- duplicate account/date combinations flagged
- Writes valid rows to Silver layer (Parquet), partitioned by
year/monthderived fromTransactionTimestamp. - Writes invalid rows to quarantine with a
validation_error, plus metadata (row_hash,attempt_count,retry_history,ingest_date,run_id). - Writes condemned rows (max attempts or exact duplicates) to condemned layer (no automatic retries).
- Writes a
_SUCCESSmarker containing run metrics after successful write.
Implementation: src/etl/ingest_transactions.py
Data Flow: Bronze (raw CSV) → Metadata Enrichment → Loop Prevention → Validation → Silver (validated Parquet) + Quarantine (invalid rows) + Condemned (max attempts/duplicates)
Design Approach / Architecture Decision¶
Procedural/Functional Style (Current Implementation)¶
Decision: The ETL is implemented using a procedural/functional approach rather than object-oriented programming (OOP).
Rationale:
- Simplicity & clarity: For the case study scope, a single-file procedural approach provides clear, linear data flow that's easy to understand end-to-end
- Performance: Functional transformations with pandas are efficient for data processing tasks
- Stateless operations: ETL transformations are inherently stateless; procedural style avoids unnecessary state management
- Rapid development: Faster time-to-market without class hierarchies and abstraction layers
- Testing: Current approach is straightforward to test with fixtures and integration tests
When to consider OOP¶
- Multiple data sources requiring different ingestion strategies
- Complex validation rules that benefit from strategy patterns
- Plugin architectures for extensible validation
- Multi-tenant support requiring encapsulation
- Large teams where class-based interfaces improve collaboration
Recommended evolution path: If extending to production, introduce light OOP (e.g., ETLPipeline orchestrator class, TransactionValidator class) while maintaining functional pandas transformations. This balances maintainability, testability, and extensibility without over-engineering.
Input assumptions (explicit)¶
Schema / columns¶
- Required columns are exactly:
TransactionID,CustomerID,TransactionAmount,Currency,TransactionTimestamp- If any required column is missing, the job fails fast with a clear error.
Currency handling¶
- Currency codes are validated against an ISO-4217 allowlist subset for the case study.
- Assumption: codes are provided as 3-letter uppercase strings in the input.
Timestamp handling¶
TransactionTimestampis parsed using pandas withutc=True.- Assumption: timestamp is either ISO-like (e.g.,
2024-01-01T10:00:00Z) or parseable by pandas. - Partitioning uses the parsed timestamp’s
yearandmonth(business-time partition).
Metadata Enrichment¶
All rows receive tracking metadata before validation:
row_hash(SHA256): Computed from all column values concatenated. Used for exact duplicate detection in quarantine history.source_file_id: Extracted from S3 path or generated from source_file identifier.attempt_count: Starts at 0 for new rows, incremented on each retry. Loaded from quarantine history for reprocessed rows.ingestion_timestamp: First ingestion time (preserved across retries for audit trail).
Loop Prevention¶
TransactionID Deduplication (Silver Layer)¶
- Pre-validation check: Before validation, checks existing TransactionIDs in Silver layer Parquet files.
- Auto-condemnation: If TransactionID + event_date found in Silver layer, row is auto-condemned with
validation_error = "DUPLICATE_TRANSACTION_ID". - Purpose: Prevents duplicate transactions from being processed across multiple ETL runs (transaction-level idempotency).
- Implementation: Optional feature enabled by providing
--silver-bucketand--silver-prefixarguments. - Performance: Uses broadcast joins (Spark) or set-based lookup (Pandas) for efficient duplicate detection.
- Partition Pruning: Automatically extracts event date range from current batch to minimize Silver layer scan.
Duplicate Detection in Quarantine History¶
- Pre-validation check: Before validation, checks existing quarantine Parquet files for exact
row_hashmatches. - Auto-condemnation: If exact duplicate found in quarantine history, row is auto-condemned with
validation_error = "DUPLICATE_FAILURE". - Purpose: Prevents infinite retry loops on identical bad data.
Attempt Limit Enforcement¶
- Maximum attempts: 3 retries per row (
attempt_countlimit). - Pre-validation check: If
attempt_count >= 3, row is auto-condemned withvalidation_error = "MAX_ATTEMPTS". - Purpose: Prevents infinite retry loops on persistently failing rows.
Circuit Breaker¶
- Threshold: >100 rows with the same error type within 1 hour window.
- Behavior: Pipeline halts automatically, requires human intervention to restart.
- Purpose: Prevents processing storms from systemic data quality issues.
- Implementation: Tracks error counts by type in time window, raises
RuntimeErrorif threshold exceeded.
Edge cases handled (by design)¶
Schema Validation Errors¶
- Missing required columns or type mismatches are quarantined with:
validation_error = "SCHEMA_ERROR"
Missing required fields¶
- Any row missing one or more required fields is quarantined with:
validation_error = "NULL_VALUE_ERROR"attempt_countis incremented
Invalid currency codes¶
- Rows with currencies outside the allowlist are quarantined with:
validation_error = "CURRENCY_ERROR"attempt_countis incremented
Invalid amount types¶
- Rows where
TransactionAmountis not parseable as a number are quarantined with: validation_error = "TYPE_ERROR"attempt_countis incremented- Note: negative amounts are allowed (e.g., withdrawals/refunds). Detecting suspicious values is a separate outlier/fraud control, not a schema validation rule.
Malformed/unparseable timestamps¶
- Rows where
TransactionTimestampcannot be parsed are quarantined with: validation_error = "TIMESTAMP_ERROR"attempt_countis incremented
Duplicate account/date combinations¶
- Rows with duplicate
CustomerID+TransactionTimestamp(date part) combinations are flagged and quarantined. validation_error = "Duplicate account/date combination"(or appended to existing errors).attempt_countis incremented- Note: This is business logic duplicate detection (separate from exact hash duplicate detection).
- Note: Duplicates are preserved (not dropped) for audit and business review.
Exact Duplicate Detection (Loop Prevention)¶
- Rows with exact
row_hashmatch in quarantine history are auto-condemned with: validation_error = "DUPLICATE_FAILURE"- Row is moved to condemned layer (no automatic retries)
Max Attempts Exceeded (Loop Prevention)¶
- Rows with
attempt_count >= 3are auto-condemned with: validation_error = "MAX_ATTEMPTS"- Row is moved to condemned layer (no automatic retries)
Circuit Breaker Triggered¶
- When >100 same errors occur within 1 hour:
- Pipeline halts with
RuntimeError - No data is written (valid or quarantine)
- Requires human intervention to investigate and restart
Multiple issues in one row¶
- If multiple validation issues occur, the error message is preserved/extended so it remains explainable.
Empty input¶
- If the DataFrame is empty, the job produces no output files and logs a warning (no crash).
All rows invalid¶
- If every row is invalid, processed output is empty and all rows land in quarantine (still a valid outcome).
Operational assumptions / decisions¶
Idempotency & reruns¶
- Each run writes under a unique
run_id(run isolation). - This means reruns/backfills do not overwrite a previous run’s outputs directly.
Quarantine as audit trail¶
- We never silently drop data. Invalid rows remain accessible for audit/debug.
- Quarantine includes metadata:
ingest_date: when the pipeline saw the rowrun_id: which execution produced the quarantine outputingest_time: the run’s ingestion timestamp (UTC)source_file: the input object identifier (e.g.,s3://bucket/key)row_hash: SHA256 hash for duplicate detectionsource_file_id: Source file identifierattempt_count: Number of processing attemptsretry_history: JSON array of previous retry attemptsingestion_timestamp: first ingestion time (preserved across retries)validation_error: error type (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR)
Condemned Layer¶
- Rows exceeding max attempts (≥3) or exact duplicates are moved to condemned layer.
- Path:
quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet - No automatic retries are performed on condemned items.
- Retention Policy:
- 7-year retention: Condemned data is retained for 7 years (2555 days) for compliance/audit
- Deletion requires human approval: All condemned data deletions must pass human approval before execution (see
HUMAN_VALIDATION_POLICY.md) - S3 lifecycle: Transitions to Glacier after 5 years (automatic), deletion after 7 years requires approval
- Cost optimization: Data transitions to Glacier after 5 years, then deleted after 7 years (with approval)
- Rationale: Mortgages can span decades, but condemned data (duplicates, max attempts) that's >7 years old with no relevance can be safely deleted after human review
- Corrupt file cleanup: Periodic cleanup script (
cleanup_condemned_corrupt.py) identifies and deletes condemned data from corrupt source files where: - All condemned data from the same
source_file_idis corrupt (SCHEMA_ERROR, TYPE_ERROR) - Data is older than 7 years
- Source file is known to be corrupt and irrelevant
- Human approval required before deletion (see
HUMAN_VALIDATION_POLICY.md) - Condemnation reasons:
DUPLICATE_FAILURE,MAX_ATTEMPTS,DUPLICATE_TRANSACTION_ID
S3 compatibility¶
- Reads use
boto3. - Partitioned Parquet writes use
s3fsto support dataset writes to AWS S3.
Monitoring & Observability¶
- CloudWatch Metrics: The ETL publishes custom metrics to CloudWatch (namespace:
Ohpen/ETL): - Volume Metrics:
InputRows: Total rows read from Bronze layerValidRows: Rows successfully validated and written to Silver layerQuarantinedRows: Rows quarantined due to validation failuresCondemnedRows: Rows auto-condemned (duplicates or max attempts)
- Quality Metrics:
QuarantineRate: Percentage of rows quarantined- Error type distribution (via
quarantine_by_reasonin logs)
- Loop Prevention Metrics:
AvgAttemptCount: Averageattempt_countacross all processed rowsDuplicateDetectionRate: Percentage of rows flagged as exact duplicatesAutoCondemnationRate: Percentage of rows auto-condemned
- Performance Metrics:
DurationSeconds: ETL execution time
- CloudWatch Logs: Structured JSON logging for CloudWatch Logs Insights:
- All log entries include
run_id,timestamp,level,message - Metric events include
metric_nameandmetric_valuefields - Enables querying with CloudWatch Logs Insights (e.g.,
fields @timestamp, run_id, metric_value.quarantine_rate) - Local Testing: CloudWatch metrics are automatically disabled when:
S3_ENDPOINT_URLis set (indicating local MinIO)DISABLE_CLOUDWATCH=trueenvironment variable is set--disable-cloudwatchCLI flag is used- Backward Compatibility:
_SUCCESSmarker file is still written for manual inspection and compatibility with downstream systems - Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings)
- Integration with Task 4: CloudWatch alarms defined in Task 4 (Terraform) consume these metrics for alerting
Configuration artifact (template)¶
For Task 4 (CI/CD artifacts), we also ship a runtime configuration template:
config.yaml(template only; the ETL currently uses CLI args/env vars)
Ownership & Governance (Task 1)¶
| Aspect | Owner | Steward | Execution | Responsibility |
|---|---|---|---|---|
| Bronze Layer | Data Platform Team | Ingestion Lead | Data Platform Team | Ingestion reliability, immutability of raw data, raw data preservation |
| Silver Layer | Domain Teams | Domain Analyst | Data Platform Team | Validation logic definition (Domain), transformation correctness (Platform), schema evolution (Domain + Platform) |
| Quarantine | Data Quality Team | Data Quality Lead | Data Platform Team | Audit trail maintenance (Platform), quarantine review and resolution (Data Quality), data quality monitoring (Data Quality) |
| Pipeline Code | Data Platform Team | Platform Lead | Data Platform Team | ETL reliability, idempotency, run isolation, technical implementation |
| Validation Rules | Domain Teams | Domain Analyst | Data Platform Team | Business rule specification (Domain), technical implementation (Platform) |
Rules¶
- Bronze Layer: Immutable and restricted. Only platform team can write; no direct business user access. Bronze data is not directly consumed by business users (raw, immutable audit trail).
- Silver Layer: Domain Teams own validation logic and schema, but Platform Team executes the transformations. Silver data is the validated analytics-ready layer for downstream consumption.
- Quarantine: Data Quality Team owns review and resolution, but Platform Team maintains the audit trail. Quarantine data requires review and resolution workflow (see Task 4 governance workflows).
- Pipeline Code: Platform Team owns all technical implementation, reliability, and operational concerns.
- Validation Rules: Domain Teams define business rules; Platform Team implements them technically.
Alignment with Task 2 & Task 4¶
- Ownership model aligns with Task 2 architecture (Bronze: Platform, Silver: Domain, Gold: Business).
- Governance workflows for schema changes and data quality issues are documented in Task 4.
- Alert ownership and escalation paths are defined in Task 4 Section 5.2.
Not handled / intentionally out of scope (case study scope)¶
These are realistic production requirements but were intentionally not implemented to keep the solution aligned with the case study scope:
- Cross-run deduplication (e.g., repeated
TransactionIDacross different ingestion runs/files). Within-run duplicates are flagged. - Outlier detection (e.g., unusually large values, abnormal patterns) and fraud/risk rules.
- Currency conversion (multi-currency reporting requires FX rates, valuation time, and consistent rounding policy).
- Late-arriving corrections beyond rerunning a period (in production you’d define correction events + reconciliation rules).
Where this is validated (tests)¶
The edge cases above are validated in:
tests/test_etl.py(unit tests): nulls, invalid currency, malformed timestamps, empty input, all-invalid, determinism, partition columns.tests/test_integration.py(integration tests): end-to-end flow, partitioned output structure,_SUCCESSmarker, quarantine output.
High-level test documentation: /home/stephen/projects/ohpen-case-2026/docs/technical/TESTING.md