Task 1 (Data Ingestion & Transformation) — Assumptions & Edge Cases

Important Note on Scope

⚠️ Scope Disclaimer: The assumptions and edge case handling described in this document are based on the case study requirements and my interpretation of the problem. I have made assumptions that may underestimate the current scope of operations at Ohpen. These assumptions would need validation with the team to ensure they align with:

  • Actual data volumes and transaction patterns
  • Real-world error rates and data quality issues
  • Production compliance and regulatory requirements
  • Existing operational processes and constraints

This document demonstrates analytical thinking and edge case consideration, but would require refinement based on production realities.


This document captures the explicit assumptions and edge cases considered for the Task 1 ETL: CSV from S3 → validate → Parquet partitioned by time, plus quarantine for invalid rows.

ETL Flow Overview

flowchart LR CSV[CSV from S3
Bronze Layer] --> Validate[Validation
Required Fields
Currency
Timestamp] Validate -->|Valid| Parquet[Parquet Output
Silver Layer
Partitioned by Year/Month] Validate -->|Invalid| Quarantine[Quarantine Layer
With Error Details] Parquet --> Success[_SUCCESS Marker
Run Metrics] style CSV fill:#f57c00,color:#fff style Validate fill:#7b1fa2,color:#fff style Parquet fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style Quarantine fill:#d32f2f,color:#fff style Success fill:#757575,color:#fff

What the ETL does (scope)

  • Reads CSV from Bronze layer (raw, immutable S3 storage).
  • Metadata Enrichment: Adds tracking metadata to all rows (row_hash, source_file_id, attempt_count, ingestion_timestamp).
  • Loop Prevention: Checks quarantine history for duplicates, enforces attempt limits (max 3), implements circuit breaker.
  • Validates rows:
  • required fields present
  • currency allowlist (ISO-4217)
  • timestamp parseable
  • duplicate account/date combinations flagged
  • Writes valid rows to Silver layer (Parquet), partitioned by year/month derived from TransactionTimestamp.
  • Writes invalid rows to quarantine with a validation_error, plus metadata (row_hash, attempt_count, retry_history, ingest_date, run_id).
  • Writes condemned rows (max attempts or exact duplicates) to condemned layer (no automatic retries).
  • Writes a _SUCCESS marker containing run metrics after successful write.

Implementation: src/etl/ingest_transactions.py

Data Flow: Bronze (raw CSV) → Metadata Enrichment → Loop Prevention → Validation → Silver (validated Parquet) + Quarantine (invalid rows) + Condemned (max attempts/duplicates)

Design Approach / Architecture Decision

Procedural/Functional Style (Current Implementation)

Decision: The ETL is implemented using a procedural/functional approach rather than object-oriented programming (OOP).

Rationale:

  • Simplicity & clarity: For the case study scope, a single-file procedural approach provides clear, linear data flow that's easy to understand end-to-end
  • Performance: Functional transformations with pandas are efficient for data processing tasks
  • Stateless operations: ETL transformations are inherently stateless; procedural style avoids unnecessary state management
  • Rapid development: Faster time-to-market without class hierarchies and abstraction layers
  • Testing: Current approach is straightforward to test with fixtures and integration tests

When to consider OOP

  • Multiple data sources requiring different ingestion strategies
  • Complex validation rules that benefit from strategy patterns
  • Plugin architectures for extensible validation
  • Multi-tenant support requiring encapsulation
  • Large teams where class-based interfaces improve collaboration

Recommended evolution path: If extending to production, introduce light OOP (e.g., ETLPipeline orchestrator class, TransactionValidator class) while maintaining functional pandas transformations. This balances maintainability, testability, and extensibility without over-engineering.

Input assumptions (explicit)

Schema / columns

  • Required columns are exactly:
  • TransactionID, CustomerID, TransactionAmount, Currency, TransactionTimestamp
  • If any required column is missing, the job fails fast with a clear error.

Currency handling

  • Currency codes are validated against an ISO-4217 allowlist subset for the case study.
  • Assumption: codes are provided as 3-letter uppercase strings in the input.

Timestamp handling

  • TransactionTimestamp is parsed using pandas with utc=True.
  • Assumption: timestamp is either ISO-like (e.g., 2024-01-01T10:00:00Z) or parseable by pandas.
  • Partitioning uses the parsed timestamp’s year and month (business-time partition).

Metadata Enrichment

All rows receive tracking metadata before validation:

  • row_hash (SHA256): Computed from all column values concatenated. Used for exact duplicate detection in quarantine history.
  • source_file_id: Extracted from S3 path or generated from source_file identifier.
  • attempt_count: Starts at 0 for new rows, incremented on each retry. Loaded from quarantine history for reprocessed rows.
  • ingestion_timestamp: First ingestion time (preserved across retries for audit trail).

Loop Prevention

TransactionID Deduplication (Silver Layer)

  • Pre-validation check: Before validation, checks existing TransactionIDs in Silver layer Parquet files.
  • Auto-condemnation: If TransactionID + event_date found in Silver layer, row is auto-condemned with validation_error = "DUPLICATE_TRANSACTION_ID".
  • Purpose: Prevents duplicate transactions from being processed across multiple ETL runs (transaction-level idempotency).
  • Implementation: Optional feature enabled by providing --silver-bucket and --silver-prefix arguments.
  • Performance: Uses broadcast joins (Spark) or set-based lookup (Pandas) for efficient duplicate detection.
  • Partition Pruning: Automatically extracts event date range from current batch to minimize Silver layer scan.

Duplicate Detection in Quarantine History

  • Pre-validation check: Before validation, checks existing quarantine Parquet files for exact row_hash matches.
  • Auto-condemnation: If exact duplicate found in quarantine history, row is auto-condemned with validation_error = "DUPLICATE_FAILURE".
  • Purpose: Prevents infinite retry loops on identical bad data.

Attempt Limit Enforcement

  • Maximum attempts: 3 retries per row (attempt_count limit).
  • Pre-validation check: If attempt_count >= 3, row is auto-condemned with validation_error = "MAX_ATTEMPTS".
  • Purpose: Prevents infinite retry loops on persistently failing rows.

Circuit Breaker

  • Threshold: >100 rows with the same error type within 1 hour window.
  • Behavior: Pipeline halts automatically, requires human intervention to restart.
  • Purpose: Prevents processing storms from systemic data quality issues.
  • Implementation: Tracks error counts by type in time window, raises RuntimeError if threshold exceeded.

Edge cases handled (by design)

Schema Validation Errors

  • Missing required columns or type mismatches are quarantined with:
  • validation_error = "SCHEMA_ERROR"

Missing required fields

  • Any row missing one or more required fields is quarantined with:
  • validation_error = "NULL_VALUE_ERROR"
  • attempt_count is incremented

Invalid currency codes

  • Rows with currencies outside the allowlist are quarantined with:
  • validation_error = "CURRENCY_ERROR"
  • attempt_count is incremented

Invalid amount types

  • Rows where TransactionAmount is not parseable as a number are quarantined with:
  • validation_error = "TYPE_ERROR"
  • attempt_count is incremented
  • Note: negative amounts are allowed (e.g., withdrawals/refunds). Detecting suspicious values is a separate outlier/fraud control, not a schema validation rule.

Malformed/unparseable timestamps

  • Rows where TransactionTimestamp cannot be parsed are quarantined with:
  • validation_error = "TIMESTAMP_ERROR"
  • attempt_count is incremented

Duplicate account/date combinations

  • Rows with duplicate CustomerID + TransactionTimestamp (date part) combinations are flagged and quarantined.
  • validation_error = "Duplicate account/date combination" (or appended to existing errors).
  • attempt_count is incremented
  • Note: This is business logic duplicate detection (separate from exact hash duplicate detection).
  • Note: Duplicates are preserved (not dropped) for audit and business review.

Exact Duplicate Detection (Loop Prevention)

  • Rows with exact row_hash match in quarantine history are auto-condemned with:
  • validation_error = "DUPLICATE_FAILURE"
  • Row is moved to condemned layer (no automatic retries)

Max Attempts Exceeded (Loop Prevention)

  • Rows with attempt_count >= 3 are auto-condemned with:
  • validation_error = "MAX_ATTEMPTS"
  • Row is moved to condemned layer (no automatic retries)

Circuit Breaker Triggered

  • When >100 same errors occur within 1 hour:
  • Pipeline halts with RuntimeError
  • No data is written (valid or quarantine)
  • Requires human intervention to investigate and restart

Multiple issues in one row

  • If multiple validation issues occur, the error message is preserved/extended so it remains explainable.

Empty input

  • If the DataFrame is empty, the job produces no output files and logs a warning (no crash).

All rows invalid

  • If every row is invalid, processed output is empty and all rows land in quarantine (still a valid outcome).

Operational assumptions / decisions

Idempotency & reruns

  • Each run writes under a unique run_id (run isolation).
  • This means reruns/backfills do not overwrite a previous run’s outputs directly.

Quarantine as audit trail

  • We never silently drop data. Invalid rows remain accessible for audit/debug.
  • Quarantine includes metadata:
  • ingest_date: when the pipeline saw the row
  • run_id: which execution produced the quarantine output
  • ingest_time: the run’s ingestion timestamp (UTC)
  • source_file: the input object identifier (e.g., s3://bucket/key)
  • row_hash: SHA256 hash for duplicate detection
  • source_file_id: Source file identifier
  • attempt_count: Number of processing attempts
  • retry_history: JSON array of previous retry attempts
  • ingestion_timestamp: first ingestion time (preserved across retries)
  • validation_error: error type (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR)

Condemned Layer

  • Rows exceeding max attempts (≥3) or exact duplicates are moved to condemned layer.
  • Path: quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet
  • No automatic retries are performed on condemned items.
  • Retention Policy:
  • 7-year retention: Condemned data is retained for 7 years (2555 days) for compliance/audit
  • Deletion requires human approval: All condemned data deletions must pass human approval before execution (see HUMAN_VALIDATION_POLICY.md)
  • S3 lifecycle: Transitions to Glacier after 5 years (automatic), deletion after 7 years requires approval
  • Cost optimization: Data transitions to Glacier after 5 years, then deleted after 7 years (with approval)
  • Rationale: Mortgages can span decades, but condemned data (duplicates, max attempts) that's >7 years old with no relevance can be safely deleted after human review
  • Corrupt file cleanup: Periodic cleanup script (cleanup_condemned_corrupt.py) identifies and deletes condemned data from corrupt source files where:
  • All condemned data from the same source_file_id is corrupt (SCHEMA_ERROR, TYPE_ERROR)
  • Data is older than 7 years
  • Source file is known to be corrupt and irrelevant
  • Human approval required before deletion (see HUMAN_VALIDATION_POLICY.md)
  • Condemnation reasons: DUPLICATE_FAILURE, MAX_ATTEMPTS, DUPLICATE_TRANSACTION_ID

S3 compatibility

  • Reads use boto3.
  • Partitioned Parquet writes use s3fs to support dataset writes to AWS S3.

Monitoring & Observability

  • CloudWatch Metrics: The ETL publishes custom metrics to CloudWatch (namespace: Ohpen/ETL):
  • Volume Metrics:
    • InputRows: Total rows read from Bronze layer
    • ValidRows: Rows successfully validated and written to Silver layer
    • QuarantinedRows: Rows quarantined due to validation failures
    • CondemnedRows: Rows auto-condemned (duplicates or max attempts)
  • Quality Metrics:
    • QuarantineRate: Percentage of rows quarantined
    • Error type distribution (via quarantine_by_reason in logs)
  • Loop Prevention Metrics:
    • AvgAttemptCount: Average attempt_count across all processed rows
    • DuplicateDetectionRate: Percentage of rows flagged as exact duplicates
    • AutoCondemnationRate: Percentage of rows auto-condemned
  • Performance Metrics:
    • DurationSeconds: ETL execution time
  • CloudWatch Logs: Structured JSON logging for CloudWatch Logs Insights:
  • All log entries include run_id, timestamp, level, message
  • Metric events include metric_name and metric_value fields
  • Enables querying with CloudWatch Logs Insights (e.g., fields @timestamp, run_id, metric_value.quarantine_rate)
  • Local Testing: CloudWatch metrics are automatically disabled when:
  • S3_ENDPOINT_URL is set (indicating local MinIO)
  • DISABLE_CLOUDWATCH=true environment variable is set
  • --disable-cloudwatch CLI flag is used
  • Backward Compatibility: _SUCCESS marker file is still written for manual inspection and compatibility with downstream systems
  • Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings)
  • Integration with Task 4: CloudWatch alarms defined in Task 4 (Terraform) consume these metrics for alerting

Configuration artifact (template)

For Task 4 (CI/CD artifacts), we also ship a runtime configuration template:

  • config.yaml (template only; the ETL currently uses CLI args/env vars)

Ownership & Governance (Task 1)

Aspect Owner Steward Execution Responsibility
Bronze Layer Data Platform Team Ingestion Lead Data Platform Team Ingestion reliability, immutability of raw data, raw data preservation
Silver Layer Domain Teams Domain Analyst Data Platform Team Validation logic definition (Domain), transformation correctness (Platform), schema evolution (Domain + Platform)
Quarantine Data Quality Team Data Quality Lead Data Platform Team Audit trail maintenance (Platform), quarantine review and resolution (Data Quality), data quality monitoring (Data Quality)
Pipeline Code Data Platform Team Platform Lead Data Platform Team ETL reliability, idempotency, run isolation, technical implementation
Validation Rules Domain Teams Domain Analyst Data Platform Team Business rule specification (Domain), technical implementation (Platform)

Rules

  • Bronze Layer: Immutable and restricted. Only platform team can write; no direct business user access. Bronze data is not directly consumed by business users (raw, immutable audit trail).
  • Silver Layer: Domain Teams own validation logic and schema, but Platform Team executes the transformations. Silver data is the validated analytics-ready layer for downstream consumption.
  • Quarantine: Data Quality Team owns review and resolution, but Platform Team maintains the audit trail. Quarantine data requires review and resolution workflow (see Task 4 governance workflows).
  • Pipeline Code: Platform Team owns all technical implementation, reliability, and operational concerns.
  • Validation Rules: Domain Teams define business rules; Platform Team implements them technically.

Alignment with Task 2 & Task 4

  • Ownership model aligns with Task 2 architecture (Bronze: Platform, Silver: Domain, Gold: Business).
  • Governance workflows for schema changes and data quality issues are documented in Task 4.
  • Alert ownership and escalation paths are defined in Task 4 Section 5.2.

Not handled / intentionally out of scope (case study scope)

These are realistic production requirements but were intentionally not implemented to keep the solution aligned with the case study scope:

  • Cross-run deduplication (e.g., repeated TransactionID across different ingestion runs/files). Within-run duplicates are flagged.
  • Outlier detection (e.g., unusually large values, abnormal patterns) and fraud/risk rules.
  • Currency conversion (multi-currency reporting requires FX rates, valuation time, and consistent rounding policy).
  • Late-arriving corrections beyond rerunning a period (in production you’d define correction events + reconciliation rules).

Where this is validated (tests)

The edge cases above are validated in:

  • tests/test_etl.py (unit tests): nulls, invalid currency, malformed timestamps, empty input, all-invalid, determinism, partition columns.
  • tests/test_integration.py (integration tests): end-to-end flow, partitioned output structure, _SUCCESS marker, quarantine output.

High-level test documentation: /home/stephen/projects/ohpen-case-2026/docs/technical/TESTING.md