Technical One-Pager: Financial Transaction Pipeline

Important Note on Scope

⚠️ Scope Disclaimer: This technical summary is based on the case study requirements and my interpretation of the problem. I have made assumptions that may underestimate the current scope of operations at Ohpen. The solution presented here should be viewed as a starting point that would require refinement based on actual production requirements, volumes, compliance needs, and operational constraints.


1. Overview

This pipeline ingests raw transaction CSVs, validates financial integrity, and publishes optimized Parquet datasets for analytics. It is designed for auditability, reproducibility, and safe schema evolution.

2. Architecture & Data Flow

Raw (S3) -> Metadata Enrichment -> Loop Prevention -> ETL (Python/Glue) -> Processed/Quarantine/Condemned (Parquet) -> Athena (SQL)

flowchart TB subgraph Ingestion["📥 Ingestion Layer"] Raw[("Raw CSV Files
S3: raw/transactions/
ingest_date=YYYY-MM-DD/")] end subgraph Enrichment["🏷️ Metadata Enrichment"] MetaAdd[Add Tracking Metadata:
• row_hash SHA256
• source_file_id
• attempt_count
• ingestion_timestamp] end subgraph LoopPrevention["🛡️ Loop Prevention"] DupCheck[Duplicate Detection
Check quarantine history] AttemptLimit[Attempt Limit Check
Max 3 retries per row] CircuitBreaker[Circuit Breaker
> 100 same errors/hour
→ halt pipeline] end subgraph Processing["⚙️ ETL Processing (AWS Glue)"] Validate[Schema Validation
• Required columns
• Data types
• ISO-4217 currency] Transform[Transform & Partition
• Parse timestamps
• Partition by year/month
• Add metadata] end subgraph Storage["💾 Storage Layer"] Processed[("Processed Parquet
Snappy compressed
run_id isolated")] Quarantine[("Quarantine
Invalid rows
+ error details
+ retry tracking")] Condemned[("Condemned
Max attempts exceeded
No more retries")] end subgraph Catalog["📚 Data Catalog"] Glue[(AWS Glue
Data Catalog)] end subgraph Query["🔍 Query Layer"] Athena[Amazon Athena
SQL Interface] end subgraph Consumers["👥 Consumers"] Users[Analysts & Applications] end Raw --> MetaAdd MetaAdd --> DupCheck DupCheck -->|Not duplicate| AttemptLimit DupCheck -->|Duplicate| Condemned AttemptLimit -->|< 3 attempts| Validate AttemptLimit -->|≥ 3 attempts| Condemned Validate --> CircuitBreaker CircuitBreaker -->|Normal| Transform CircuitBreaker -->|Threshold exceeded| HaltPipeline[🚨 Halt Pipeline] Transform -->|Valid| Processed Transform -->|Invalid| Quarantine Quarantine -.->|After review & fix| MetaAdd Processed --> Glue Glue --> Athena Athena --> Users style Ingestion fill:#f57c00,color:#fff style Enrichment fill:#7b1fa2,color:#fff style LoopPrevention fill:#fbc02d,color:#111 style Processing fill:#7b1fa2,color:#fff style Storage fill:#388e3c,color:#fff style Catalog fill:#424242,color:#fff style Query fill:#1976d2,color:#fff style Consumers fill:#1976d2,color:#fff style Condemned fill:#d32f2f,color:#fff style HaltPipeline fill:#d32f2f,color:#fff
  1. Ingestion: Immutable CSVs land in raw/transactions/ingest_date=YYYY-MM-DD/.
  2. Metadata Enrichment: Each row is enriched with tracking metadata:
    • row_hash (SHA256): Unique identifier for deduplication
    • source_file_id: Source file identifier
    • attempt_count: Number of processing attempts (starts at 0)
    • ingestion_timestamp: When the row was first ingested
  3. Loop Prevention:
    • TransactionID Deduplication: Checks Silver layer for existing TransactionID + event_date combinations; auto-condemns if found (optional, enabled via --silver-bucket and --silver-prefix)
    • Duplicate Detection: Checks quarantine history for exact duplicates (same row_hash); auto-condemns if found
    • Attempt Limit: Maximum 3 retry attempts per row; auto-condemns if exceeded
    • Circuit Breaker: Halts pipeline if >100 same errors occur within 1 hour (requires human intervention)
  4. Transformation:
    • Validation: Schema/required-column checks, null checks, numeric amount type check, ISO-4217 currency validation, timestamp parsing.
    • Quarantine: Invalid rows are isolated in quarantine/ with error details, attempt_count, and retry history; never dropped silently.
    • Partitioning: Output partitioned by year/month for query performance.
  5. Storage:
    • Format: Snappy-compressed Parquet (currently Parquet-only; Iceberg is a future enhancement).
    • Idempotency: Each run writes to a unique run_id path to prevent data corruption during retries.
    • Metadata: run_id and ingest_time are emitted in _SUCCESS; row-level quarantine includes validation_error, row_hash, attempt_count, and source context.
    • Human Approval: For financial data compliance, human approval is required before promoting Silver layer data to production consumption (see HUMAN_VALIDATION_POLICY.md).
    • Condemned Layer: Rows exceeding max attempts or exact duplicates are moved to quarantine/condemned/ for permanent retention (no automatic retries).

2.1. Data Lake Folder Structure

The data lake follows a Bronze/Silver/Gold medallion architecture with explicit folder organization:

s3://data-lake-bucket/
├── bronze/                                    # Bronze Layer (Raw, Immutable)
│   └── mortgages/
│       └── transactions/
│           └── ingest_date=2026-01-21/        # Partition by arrival time
│               └── run_id=20260121T120000Z/    # Run isolation for idempotency
│                   └── file.csv.gz            # Original source file (verbatim copy)
│
├── silver/                                    # Silver Layer (Validated & Enriched)
│   └── mortgages/
│       └── transactions/
│           └── year=2026/               # Business-time partition (optimized for queries)
│               └── month=01/
│                   └── schema_v=v1/          # Schema version for evolution
│                       └── run_id=20260121T120000Z/  # Write-once run isolation
│                           ├── _SUCCESS      # Atomic completion marker
│                           └── part-00000.parquet  # Optimized storage format
│
├── quarantine/                                # Invalid Data (Audit Trail)
│   └── mortgages/
│       └── transactions/
│           └── ingest_date=2026-01-21/
│               └── run_id=20260121T120000Z/
│                   └── invalid_rows.parquet  # Contains original data + validation_error column
│           └── condemned/                    # Condemned Data (No More Retries)
│               └── ingest_date=2026-01-21/
│                   └── run_id=20260121T120000Z/
│                       └── condemned_rows.parquet  # Max attempts exceeded or exact duplicates
│
└── gold/                                      # Gold Layer (Business Views)
    └── finance/                               # Task 2: Structure Design (folder organization, governance)
        └── account_balances/                  # Task 3: SQL Aggregation Pattern (shows how to create Gold data)
            └── schema_v=v1/                   # Task 1: Does not write Gold files (only Bronze → Silver)
                ├── _LATEST.json              # Authority pointer to current run
                ├── run_id=20260121T120000Z/   # Historical run
                │   └── as_of_month=2024-03/
                │       └── part-00000.parquet
                └── current/                  # Stable prefix for SQL access
                    └── as_of_month=2024-03/
                        └── part-00000.parquet

Key Path Patterns:

  • Bronze: bronze/{domain}/{dataset}/ingest_date={YYYY-MM-DD}/run_id={...}/file.csv.gz
  • Silver: silver/{domain}/{dataset}/year={YYYY}/month={MM}/schema_v={vN}/run_id={...}/part-*.parquet
  • Quarantine: quarantine/{domain}/{dataset}/ingest_date={YYYY-MM-DD}/run_id={...}/invalid_rows.parquet
  • Condemned: quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet
  • Gold: gold/{domain}/{dataset}/schema_v={vN}/run_id={...}/as_of_month={YYYY-MM}/part-*.parquet (Task 2: Structure Design, Task 3: SQL Aggregation Pattern)
  • Gold (current): gold/{domain}/{dataset}/schema_v={vN}/current/{partition}/part-*.parquet (Task 2: Structure Design, Task 3: SQL Aggregation Pattern)

Note: Gold layer structure is designed in Task 2 (Architecture - folder organization, governance). Task 3 (SQL) demonstrates how to aggregate Silver → Gold via SQL queries (shows how to create Gold data). Task 1 (ETL Pipeline) does not write Gold layer files - it only implements Bronze → Silver transformation.

Condemned Retention Policy:

  • Condemned rows are retained for compliance/audit purposes (7 years minimum)
  • No automatic retries are performed on condemned items
  • Manual review and intervention required for any reprocessing
  • Human approval required before deletion (see HUMAN_VALIDATION_POLICY.md for approval workflow)

2.2. Data Contract

Required Fields:

  • TransactionID (string): Unique transaction identifier
  • CustomerID (string): Customer identifier
  • TransactionAmount (numeric): Transaction amount (negative values allowed for withdrawals/refunds)
  • Currency (string): ISO-4217 currency code (e.g., EUR, USD, GBP)
  • TransactionTimestamp (timestamp): Transaction timestamp (ISO-8601 format)

Partition Keys (derived from TransactionTimestamp):

  • year: Year extracted from TransactionTimestamp
  • month: Month extracted from TransactionTimestamp

Output Schema:

  • All input fields preserved
  • Partition columns added: year, month
  • Metadata columns: row_hash (SHA256), source_file_id, attempt_count, ingestion_timestamp, ingest_time, run_id
  • Quarantine columns: All above + validation_error, retry_history

2.3. Validation & Quality Controls

flowchart TD Start([Incoming CSV Row]) --> Enrich[Enrich Metadata:
+ row_hash SHA256
+ source_file_id
+ attempt_count] Enrich --> TxIDCheck{TransactionID
in Silver
Layer?} TxIDCheck -->|Yes| Condemned TxIDCheck -->|No| DupCheck{Exact duplicate
in quarantine
history?} DupCheck -->|Yes| AutoCondemn1[Auto-Condemn:
DUPLICATE_FAILURE] DupCheck -->|No| AttemptCheck{attempt_count
≥ 3?} AttemptCheck -->|Yes| AutoCondemn2[Auto-Condemn:
MAX_ATTEMPTS] AttemptCheck -->|No| SchemaCheck{Schema Valid?
Required columns} SchemaCheck -->|No| Q1[Quarantine:
SCHEMA_ERROR
attempt_count++] SchemaCheck -->|Yes| NullCheck{Null Checks
Critical fields} NullCheck -->|Failed| Q2[Quarantine:
NULL_VALUE_ERROR
attempt_count++] NullCheck -->|Passed| TypeCheck{Numeric Amount?
Valid data type} TypeCheck -->|No| Q3[Quarantine:
TYPE_ERROR
attempt_count++] TypeCheck -->|Yes| CurrencyCheck{ISO-4217
Currency Code?} CurrencyCheck -->|Invalid| Q4[Quarantine:
CURRENCY_ERROR
attempt_count++] CurrencyCheck -->|Valid| TimestampCheck{Timestamp
Parseable?} TimestampCheck -->|Failed| Q5[Quarantine:
TIMESTAMP_ERROR
attempt_count++] TimestampCheck -->|Passed| Success[✓ Write to
Processed Parquet] Q1 --> CircuitBreaker{Same error
> 100 rows
in 1 hour?} Q2 --> CircuitBreaker Q3 --> CircuitBreaker Q4 --> CircuitBreaker Q5 --> CircuitBreaker CircuitBreaker -->|Yes| HaltPipeline[🚨 HALT PIPELINE
Circuit breaker triggered
Human intervention required] CircuitBreaker -->|No| QuarantineStore QuarantineStore[("Quarantine Storage
+ error details
+ row_hash
+ attempt_count
+ retry_history
+ timestamp")] Success --> Processed[("Processed Layer
year/month partitions
run_id isolated")] QuarantineStore --> Aging{Age Check} Aging -->|< 7 days| Monitor[Monitor] Aging -->|7-30 days| Review[Human Review
& Decision] Aging -->|> 30 days| Review Review -->|Source Fix
NEW file_id| SourceFix[Source Fix
attempt_count reset to 0] Review -->|ETL Fix
SAME file_id| ETLFix[ETL Fix
attempt_count preserved] Review -->|Condemn| Condemned AutoCondemn1 --> Condemned[("Condemned
quarantine/condemned/")] AutoCondemn2 --> Condemned SourceFix -.->|Re-ingest as new| Start ETLFix -.->|Reprocess| DupCheck Monitor -.-> Aging style Start fill:#f57c00,color:#fff style Success fill:#2e7d32,color:#fff style Processed fill:#388e3c,color:#fff style Q1 fill:#d32f2f,color:#fff style Q2 fill:#d32f2f,color:#fff style Q3 fill:#d32f2f,color:#fff style Q4 fill:#d32f2f,color:#fff style Q5 fill:#d32f2f,color:#fff style QuarantineStore fill:#d32f2f,color:#fff style Review fill:#ffa000,color:#111 style Condemned fill:#d32f2f,color:#fff style SourceFix fill:#2e7d32,color:#fff style ETLFix fill:#7b1fa2,color:#fff style DupCheck fill:#fbc02d,color:#111 style AttemptCheck fill:#fbc02d,color:#111 style CircuitBreaker fill:#ffa000,color:#111 style HaltPipeline fill:#d32f2f,color:#fff style AutoCondemn1 fill:#d32f2f,color:#fff style AutoCondemn2 fill:#d32f2f,color:#fff

Pre-Validation Checks (Loop Prevention):

  1. Metadata Enrichment → All rows receive tracking metadata
  2. row_hash (SHA256): Computed from row content for deduplication
  3. source_file_id: Identifier of the source file
  4. attempt_count: Starts at 0, increments on each retry
  5. ingestion_timestamp: First ingestion time

  6. Duplicate Detection → Auto-condemn if exact duplicate found

  7. Checks: Quarantine history for exact row_hash match
  8. Failure action: Auto-condemn to quarantine/condemned/ with validation_error = "DUPLICATE_FAILURE"
  9. Purpose: Prevents infinite retry loops on identical bad data

  10. Attempt Limit Check → Auto-condemn if max attempts exceeded

  11. Checks: attempt_count ≥ 3
  12. Failure action: Auto-condemn to quarantine/condemned/ with validation_error = "MAX_ATTEMPTS"
  13. Purpose: Prevents infinite retry loops on persistently failing rows

Validation Rules (applied in order after pre-validation):

  1. Schema Validation → Quarantine if schema invalid
  2. Checks: Required columns present, data types valid
  3. Failure action: Row quarantined with validation_error = "SCHEMA_ERROR", attempt_count++

  4. Null Checks → Quarantine if missing required fields

  5. Checks: All required fields (TransactionID, CustomerID, TransactionAmount, Currency, TransactionTimestamp) must be non-null
  6. Failure action: Row quarantined with validation_error = "NULL_VALUE_ERROR", attempt_count++

  7. Amount Type Check → Quarantine if non-numeric

  8. Checks: TransactionAmount must be parseable as numeric
  9. Failure action: Row quarantined with validation_error = "TYPE_ERROR", attempt_count++
  10. Note: Negative amounts are allowed (withdrawals/refunds)

  11. Currency Validation → Quarantine if not ISO-4217

  12. Checks: Currency code must be in allowed list (ISO-4217 standard)
  13. Failure action: Row quarantined with validation_error = "CURRENCY_ERROR", attempt_count++

  14. Timestamp Parsing → Quarantine if unparseable

  15. Checks: TransactionTimestamp must be parseable as ISO-8601 timestamp
  16. Failure action: Row quarantined with validation_error = "TIMESTAMP_ERROR", attempt_count++

Post-Quarantine Circuit Breaker:

  • Circuit Breaker Check: After each quarantine action, check if >100 rows with the same error type occurred within the last hour
  • Threshold Exceeded: Pipeline halts automatically, requires human intervention
  • Purpose: Prevents processing storms from systemic data quality issues

What Happens on Failure:

  • Pre-validation failures (duplicates, max attempts) → Auto-condemn to quarantine/condemned/
  • Validation failures → Quarantine with attempt_count incremented
  • Never silently dropped — all invalid rows preserved for audit and review
  • Quarantine includes: original data + validation_error column + metadata (row_hash, attempt_count, retry_history, ingest_date, run_id, source_file)
  • Valid rows proceed to Silver layer (validated Parquet)

Quarantine Aging & Review:

  • < 7 days: Automated monitoring (no human action required)
  • 7-30 days: Flagged for human review and decision
  • > 30 days: Escalated for review (potential systemic issue)
  • Resolution paths: Source Fix (new file, attempt_count resets), ETL Fix (same file, attempt_count preserved), or Condemn

3. Reprocessing & Backfills

We assume an append-only raw layer. The backfill and reprocessing workflow includes loop prevention checks to avoid infinite retry cycles.

sequenceDiagram participant Ops as Operations Team participant ETL as ETL Pipeline participant Raw as Raw Layer
(Immutable) participant Quar as Quarantine participant Meta as Metadata Store
(row_hash tracking) participant Proc as Processed Layer participant Cat as Glue Catalog participant Users as Downstream Users Note over Ops,Users: Scenario 1: Late-arriving data (clean ingestion) Ops->>ETL: Trigger backfill for
ingest_date=2025-01-15 ETL->>Raw: Read from
raw/transactions/
ingest_date=2025-01-15/ ETL->>ETL: Generate row_hash
Set attempt_count=0 Note over Raw: Original data unchanged ETL->>Proc: Write to NEW run_id:
run_id=20250128_BACKFILL ETL->>Cat: Update partition metadata Cat-->>Users: Updated data available Note over Ops,Users: Scenario 2: Quarantine remediation with loop checks Ops->>Quar: Review quarantined items
Age > 7 days Quar->>Meta: Check retry history
for each row_hash alt Source Provider Fix (New File) Note over Meta: NEW source_file_id
→ attempt_count RESETS to 0 Ops->>Raw: Corrected CSV from provider
source_file_id=corrected_v2.csv Raw->>ETL: Ingest new file ETL->>Meta: New row_hash computed
attempt_count=0 ETL->>Proc: Write with
run_id=20250128_SOURCE_FIX ETL->>Quar: Mark original rows
as superseded else ETL Logic Fix (Same File) Note over Meta: SAME source_file_id
→ attempt_count INCREMENTS Meta->>Meta: Check: attempt_count < 3? alt Attempts < 3 Ops->>ETL: Deploy updated
validation rules ETL->>Raw: Reprocess original data ETL->>Meta: Increment attempt_count
Log retry_history ETL->>Proc: Write with
run_id=20250128_ETL_FIX ETL->>Quar: Update status else Attempts ≥ 3 Meta->>Quar: Auto-condemn:
MAX_ATTEMPTS Note over Quar: Row moved to
quarantine/condemned/ Quar-->>Ops: Alert: Row condemned
No more auto-retries end else Circuit Breaker Triggered ETL->>ETL: Detect: >100 same errors
in 1 hour window ETL->>Ops: 🚨 HALT PIPELINE
Circuit breaker activated Note over ETL: Pipeline stopped
Human intervention required Ops->>ETL: Investigate root cause
Fix systemic issue Ops->>ETL: Manual restart after fix end ETL->>Cat: Update catalog Cat-->>Users: Previously quarantined
data now available Note over Quar: Auto-condemned items (>90d or >3 attempts)
→ quarantine/condemned/

Backfill Process:

  1. Trigger Backfill: AWS Step Functions or EventBridge triggers the AWS Glue job for the specific raw partition.
  2. Metadata Generation: Each row receives row_hash (SHA256) and attempt_count=0 for new ingestions.
  3. New Version: Output writes to a new run_id folder (e.g., processed/.../run_id=20260128_BACKFILL).
  4. Publish: Update the AWS Glue Data Catalog to point to the new run_id (currently Parquet-only; Iceberg would provide automatic partition snapshots as a future enhancement).

Quarantine Remediation Scenarios

Scenario A: Source Provider Fix (New File)

  • Trigger: Data provider sends corrected CSV file with new source_file_id
  • Behavior: attempt_count resets to 0 (treated as new data)
  • Process:
  • New file ingested to raw/ with new source_file_id
  • New row_hash computed (may differ if data was corrected)
  • Original quarantined rows marked as superseded
  • New run writes to run_id=YYYYMMDD_SOURCE_FIX

Scenario B: ETL Logic Fix (Same File)

  • Trigger: ETL validation rules updated to handle previously invalid data
  • Behavior: attempt_count increments (preserves retry history)
  • Process:
  • Check attempt_count < 3 before reprocessing
  • If < 3: Reprocess original data with updated rules, increment attempt_count, log to retry_history
  • If ≥ 3: Auto-condemn to quarantine/condemned/ (no automatic retry)
  • New run writes to run_id=YYYYMMDD_ETL_FIX

Scenario C: Circuit Breaker Triggered

  • Trigger: >100 rows with same error type within 1 hour window
  • Behavior: Pipeline halts automatically, requires human intervention
  • Process:
  • Pipeline stops processing immediately
  • Alert sent to operations team (PagerDuty critical)
  • Root cause investigation required
  • Manual restart after systemic issue is resolved

Loop Prevention Guarantees:

  • Duplicate Detection: Exact row_hash matches in quarantine history → Auto-condemn
  • Attempt Limit: Maximum 3 retries per row → Auto-condemn if exceeded
  • Circuit Breaker: Prevents processing storms from systemic issues
  • Metadata Tracking: row_hash, source_file_id, attempt_count, retry_history enable full audit trail

3.1. Schema Evolution Strategy

Compatibility Policy:

  • Backward-compatible reads: Updated consumers can read both old and new data
  • Forward-compatible writes: Updated producers may emit additional fields without breaking older consumers
  • Additive changes only: New columns must be nullable/optional

Schema Versioning:

  • All Silver/Gold paths include schema_v={version} (e.g., schema_v=v1, schema_v=v2)
  • Enables schema evolution without breaking consumers
  • Backward compatibility maintained — old data remains accessible

Allowed Changes (Safe / Zero-Downtime):

  • Add new nullable/optional field (e.g., TransactionType)
  • Deprecate before remove: keep field, stop populating, document replacement
  • Add new derived views instead of changing semantics

Breaking Changes (Require Playbook):

  • Type changes require new schema version and backfill
  • Renames require transition period with aliases
  • Semantic changes require versioned v2 dataset

Recommended Approach:

  • Current: Parquet-only with schema versioning (schema_v=v1, schema_v=v2)
  • Future Enhancement: Apache Iceberg tables via AWS Glue for advanced schema evolution (add/rename/reorder columns without rewriting files)
  • Fallback: Parquet-only with additive changes and schema registry in Glue Data Catalog

See Task 2 architecture documentation for complete schema evolution strategy.

4. Operational Monitoring

flowchart TB subgraph Metrics["📊 Key Metrics Collection"] Volume[Volume Metrics
• Rows ingested
• Rows processed
• Rows quarantined
• Rows condemned] Quality[Quality Metrics
• Quarantine rate %
• Error type distribution
• Validation pass rate
• Retry attempt distribution] Latency[Latency Metrics
• ETL duration
• End-to-end time
• Partition lag] LoopMetrics[Loop Prevention Metrics
• Avg attempt_count
• Duplicate detection rate
• Circuit breaker triggers
• Auto-condemnation rate] end subgraph Thresholds["⚠️ Alert Thresholds"] T1[Quarantine Rate > 1%] T2[Job Failure] T3[Processing Time > 2x avg] T4[Zero rows processed] T5[Circuit Breaker Triggered] T6[Avg attempt_count > 1.5] T7[Auto-condemn rate > 0.5%] end subgraph Actions["🚨 Alert Actions"] PD[PagerDuty Alert
On-call engineer] Slack[Slack Notification
Data team channel] Log[CloudWatch Logs
Detailed diagnostics] AutoHalt[Automatic Pipeline Halt
Circuit breaker engaged] end subgraph Dashboards["📈 Visualization"] CW[CloudWatch Dashboard
Real-time metrics
+ retry trends] Grafana[Grafana
Historical trends
+ loop detection] Report[Daily Summary Report
Email digest
+ condemned items] end Volume --> T1 Volume --> T4 Quality --> T1 Latency --> T3 LoopMetrics --> T5 LoopMetrics --> T6 LoopMetrics --> T7 T1 -->|Critical| PD T2 -->|Critical| PD T3 -->|Warning| Slack T4 -->|Critical| PD T5 -->|Critical| AutoHalt T5 -->|Critical| PD T6 -->|Warning| Slack T7 -->|Warning| Slack T1 --> Log T2 --> Log T3 --> Log T5 --> Log Volume --> CW Quality --> CW Latency --> CW LoopMetrics --> CW CW --> Grafana CW --> Report style Metrics fill:#00897b,color:#fff style Thresholds fill:#ffa000,color:#111 style Actions fill:#d32f2f,color:#fff style Dashboards fill:#00897b,color:#fff style LoopMetrics fill:#7b1fa2,color:#fff style T5 fill:#d32f2f,color:#fff style AutoHalt fill:#d32f2f,color:#fff

CloudWatch Metrics & Logging

The pipeline publishes custom metrics to CloudWatch (namespace: Ohpen/ETL):

Volume Metrics:

  • InputRows: Total rows read from raw layer
  • ValidRows: Rows successfully written to processed layer
  • QuarantinedRows: Rows sent to quarantine
  • CondemnedRows: Rows auto-condemned (duplicates or max attempts)

Quality Metrics:

  • QuarantineRate: Percentage of rows quarantined
  • ErrorTypeDistribution: Breakdown by error type (SCHEMA_ERROR, NULL_VALUE_ERROR, etc.)
  • ValidationPassRate: Percentage of rows passing all validations
  • RetryAttemptDistribution: Distribution of attempt_count values

Performance Metrics:

  • DurationSeconds: ETL execution time
  • EndToEndTime: Total time from ingestion to availability
  • PartitionLag: Delay in partition availability

Loop Prevention Metrics:

  • AvgAttemptCount: Average attempt_count across all processed rows
  • DuplicateDetectionRate: Percentage of rows flagged as duplicates
  • CircuitBreakerTriggers: Count of circuit breaker activations
  • AutoCondemnationRate: Percentage of rows auto-condemned

All logs are structured JSON for CloudWatch Logs Insights, enabling queries like:

fields @timestamp, run_id, metric_value.quarantine_rate
| filter metric_name = "ETLComplete"

Alert Thresholds & Actions

Critical Alerts (PagerDuty + Auto-halt):

  • T1: Quarantine Rate > 1% → Data Quality Team (P2 - 4 hours)
  • T2: Job Failure → Data Platform Team (P1 - Immediate)
  • T4: Zero rows processed → Data Platform Team (P1 - Immediate)
  • T5: Circuit Breaker Triggered → Automatic pipeline halt + Data Platform Team (P1 - Immediate)

Warning Alerts (Slack Notification):

  • T3: Processing Time > 2x average → Data Platform Team (P3 - 8 hours)
  • T6: Avg attempt_count > 1.5 → Data Quality Team (P2 - 4 hours)
  • T7: Auto-condemn rate > 0.5% → Data Quality Team (P2 - 4 hours)

Dashboards & Visualization

  • CloudWatch Dashboard: Real-time metrics, retry trends, circuit breaker status
  • Grafana: Historical trends, loop detection patterns, long-term quality trends
  • Daily Summary Report: Email digest with condemned items, quarantine summary, key metrics

Alert Ownership & Escalation

Alerts are routed to appropriate teams based on alert type:

  • Infrastructure Alerts (Data Platform Team): Job failures, missing partitions, runtime anomalies, circuit breaker triggers
  • Data Quality Alerts (Data Quality Team): Quarantine rate spikes (> 1%), validation failures, high attempt_count, auto-condemnation spikes
  • Business Metric Alerts (Domain Teams / Business): Volume anomalies, SLA breaches

See Task 4 CI/CD documentation (Section 5.2) for complete alert ownership matrix and escalation paths.

4.2. Quarantine Lifecycle Management

flowchart TB subgraph Quarantine["🚫 Quarantine Entry"] Failed[Validation Failed
Row quarantined with:
• Error type
• Source context
• Timestamp] end subgraph Triage["🔍 Daily Triage (Automated)"] Age{Quarantine Age} ErrorType{Error Type
Classification} end subgraph Review["👥 Human Review"] DataTeam[Data Team Review
• Analyze error patterns
• Identify root cause
• Determine action] Decision{Resolution
Decision} end subgraph Actions["⚙️ Resolution Actions"] SourceFix[Source System Fix
• Notify data provider
• Request corrected file
• Re-ingest to raw/] ETLFix[ETL Logic Fix
• Update validation rules
• Backfill from raw/
• Reprocess partition] ManualFix[Manual Correction
• Create corrected CSV
• Inject into raw/
• Mark for reprocessing] end subgraph Reprocess["♻️ Reprocessing"] Revalidate[Re-run Validation
Against fixed data
or updated rules] PassCheck{Validation
Passed?} end subgraph Terminal["⚰️ Permanent Disposition"] Condemn[Condemned
Move to:
quarantine/condemned/
retention_date] Success[Successfully
Reprocessed
Move to processed/] end Failed --> Age Age -->|< 7 days| ErrorType Age -->|7-30 days| Review Age -->|> 30 days| AutoCondemn[Auto-flag for
condemnation review] ErrorType -->|Fixable
Schema/Type| Review ErrorType -->|Systemic
Pattern| Review ErrorType -->|One-off| Monitor[Monitor until
7-day threshold] AutoCondemn --> Review Monitor --> Age DataTeam --> Decision Decision -->|Upstream
Issue| SourceFix Decision -->|Our Bug| ETLFix Decision -->|Data Entry
Error| ManualFix Decision -->|Irreparable| Condemn SourceFix --> Revalidate ETLFix --> Revalidate ManualFix --> Revalidate Revalidate --> PassCheck PassCheck -->|Yes| Success PassCheck -->|No| Review Condemn -.->|After retention
period| Delete[Permanent Deletion
Per compliance policy] style Failed fill:#d32f2f,color:#fff style Review fill:#ffa000,color:#111 style Success fill:#2e7d32,color:#fff style Condemn fill:#d32f2f,color:#fff style Revalidate fill:#7b1fa2,color:#fff

Quarantine Lifecycle Stages:

  1. Quarantine Entry: Row fails validation and is quarantined with error details, row_hash, attempt_count, and source context.

  2. Daily Triage (Automated):

  3. Age < 7 days: Automated monitoring (no human action)
  4. Age 7-30 days: Flagged for human review
  5. Age > 30 days: Auto-flagged for condemnation review (potential systemic issue)

  6. Human Review: Data team analyzes error patterns, identifies root cause, and determines resolution action.

  7. Resolution Actions:

  8. Source System Fix: Notify provider, request corrected file, re-ingest (new source_file_id, attempt_count resets)
  9. ETL Logic Fix: Update validation rules, backfill from raw (same source_file_id, attempt_count increments)
  10. Manual Correction: Create corrected CSV, inject into raw, mark for reprocessing

  11. Reprocessing: Re-run validation against fixed data or updated rules.

  12. Terminal Disposition:

  13. Success: Row successfully reprocessed, moved to processed layer
  14. Condemned: Moved to quarantine/condemned/ for permanent retention (no automatic retries)

Retention Policy: Condemned items retained per compliance policy (typically 7 years), then permanently deleted.

4.1. CI/CD + Deployment

CI Pipeline (GitHub Actions):

  • Validation: Runs on every Pull Request
  • ruff linting (code style)
  • pytest unit tests (partition logic, null handling, quarantine checks)
  • Artifact Build: Packages Python ETL code, tags with Git SHA (e.g., etl-v1.0.0-a1b2c3d.zip)
  • Deployment (CD):
  • Uploads artifact to S3 (Code Bucket)
  • Terraform plan & apply to update AWS infrastructure (Glue Jobs, IAM, Buckets)
  • Updates Glue Job to point to new artifact

Backfill Safety Checks:

  • Determinism: Rerunning same input produces exact same counts
  • Partitioning: Timestamps map to correct year=YYYY/month=MM folder
  • Quarantine: Invalid rows never silently dropped

Failure Handling:

  • Failed runs do not update _LATEST.json or current/ prefix
  • Each rerun uses new run_id (timestamp-based)
  • Previous failed runs remain in storage (audit trail)

See Task 4 CI/CD documentation for complete workflow details.

5. Cost & Scalability

flowchart LR subgraph Storage["💾 Storage Costs"] Raw[Raw CSV
S3 Infrequent Access
$0.0125/GB/month] Processed[Processed Parquet
S3 Standard
Snappy compression
~70% reduction] Archive[Quarantine
S3 Glacier
Long-term retention] end subgraph Compute["⚡ Compute Costs"] Glue[AWS Glue Serverless
$0.44/DPU-hour
Auto-scaling] NoServers[No infrastructure
management
Pay per job] end subgraph Query["🔍 Query Costs"] Partition[Partitioning Strategy
year/month reduces
data scanned] Athena[Athena Pricing
$5 per TB scanned] Savings[Typical query scans
1-5% of total data
due to partitions] end subgraph Scale["📈 Scalability"] Horizontal[Horizontal Scaling
Glue DPUs scale
with data volume] S3Scale[S3 unlimited
storage capacity] NoBottleneck[No single point
of failure] end Raw -.->|Compression| Processed Processed -.->|Archival| Archive Glue -.->|Reads| Raw Glue -.->|Writes| Processed Partition -.->|Optimizes| Athena Athena -.->|Queries| Processed Horizontal -.->|Supports| Glue S3Scale -.->|Backs| Storage style Storage fill:#388e3c,color:#fff style Compute fill:#ffa000,color:#111 style Query fill:#1976d2,color:#fff style Scale fill:#7b1fa2,color:#fff

Storage Costs:

  • Raw Layer: S3 Infrequent Access ($0.0125/GB/month) for immutable source data
  • Processed Layer: S3 Standard with Snappy compression (~70% size reduction vs CSV)
  • Quarantine/Condemned: S3 Glacier for long-term retention (compliance/audit)

Compute Costs:

  • AWS Glue Serverless: $0.44/DPU-hour, auto-scales with data volume
  • No Infrastructure Management: Pay per job execution, no idle costs
  • Horizontal Scaling: Glue DPUs scale automatically with data volume

Query Costs:

  • Amazon Athena: $5 per TB scanned
  • Partitioning Strategy: year/month partitioning reduces typical query scans to 1-5% of total data
  • Cost Optimization: Partition pruning minimizes scan costs significantly

Scalability:

  • Horizontal Scaling: Glue DPUs scale automatically with data volume
  • S3 Unlimited Capacity: No storage bottlenecks
  • No Single Point of Failure: Distributed architecture ensures high availability

6. Security & Governance (production expectations)

flowchart TB subgraph Encryption["🔐 Encryption"] Transit[TLS in Transit
All API calls
HTTPS only] Rest[S3 Encryption at Rest
SSE-S3 or SSE-KMS
Automatic encryption] end subgraph Access["👤 Access Control"] IAM[IAM Least Privilege
Separate roles for:
• ETL writers
• Analyst readers
• Admin operators] Raw_Access[Raw Layer
Restricted access
Audit logged] Quarantine_Access[Quarantine
Limited visibility
Data team only] end subgraph PII["🔒 PII Protection"] Minimize[Minimize PII
in raw layer
where possible] Mask[Masking/Tokenization
in curated layers
if required] Retention[Data retention
policies per
compliance needs] end subgraph Audit["📝 Auditability"] Immutable[Immutable raw layer
Complete history] RunID[run_id isolation
Version tracking] Success[_SUCCESS metadata
Job traceability] Logs[CloudTrail & CloudWatch
Complete audit trail
Catalog changes logged] end Transit -.-> Rest IAM -.-> Raw_Access IAM -.-> Quarantine_Access Minimize -.-> Mask Mask -.-> Retention Immutable -.-> RunID RunID -.-> Success Success -.-> Logs style Encryption fill:#6a1b9a,color:#fff style Access fill:#6a1b9a,color:#fff style PII fill:#6a1b9a,color:#fff style Audit fill:#757575,color:#fff

Data Ownership Model

Ownership shifts from technical → business as data moves downstream:

  • Bronze Layer: Data Platform Team (immutability, ingestion reliability) - ✅ Implemented (Task 1)
  • Silver Layer: Domain Teams (validation rules, schema evolution) - ✅ Implemented (Task 1)
  • Gold Layer: Business/Finance (metric definitions, reporting accuracy) - Task 2: Structure Design, Task 3: SQL Aggregation

Access Control & Permissions

  • Encryption: TLS in transit; S3 encryption at rest (SSE-S3 or SSE-KMS)
  • IAM Roles: Least privilege per layer (Platform: RW on Bronze/Silver, Domain: RW on Silver, Business: R on Gold)
  • Prefix-Scoped Permissions: IAM policies are scoped to S3 prefixes for fine-grained access control:

  • Platform Team: s3://bucket/bronze/*, s3://bucket/silver/*, s3://bucket/quarantine/*

  • Domain Teams: s3://bucket/silver/{domain}/* (write), s3://bucket/gold/{domain}/* (read)
  • Business/Analysts: s3://bucket/gold/* (read-only)
  • Compliance: s3://bucket/bronze/*, s3://bucket/quarantine/* (read-only for audit)
  • Restricted Access: raw/ and quarantine/ restricted to Platform and Compliance teams
  • PII Handling: Keep PII minimal in raw/ where possible; mask/tokenize in curated layers if required

Governance & Auditability

  • Immutability: Bronze layer is immutable (append-only, no overwrites)
  • Run Isolation: Each run writes to unique run_id path for audit trail
  • Metadata: _SUCCESS markers include run metrics; CloudWatch logs capture full lineage
  • Schema Versioning: All schema changes versioned via schema_v for backward compatibility
  • Change Approval: Schema changes require Domain/Business approval + Platform implementation

For complete governance model, see Task 4 CI/CD documentation (Section 5: Ownership & Governance).

7. Runbook

How to Rerun:

  • Trigger via AWS Step Functions or EventBridge with specific raw partition
  • Specify ingest_date partition to reprocess (e.g., ingest_date=2026-01-21)
  • New run writes to unique run_id path (e.g., run_id=20260128_BACKFILL)
  • After validation, update _LATEST.json and current/ prefix to point to new run

Where Logs Are:

  • CloudWatch Logs: /aws-glue/jobs/output (namespace: Ohpen/ETL)
  • Structured JSON format for CloudWatch Logs Insights queries
  • Query example: fields @timestamp, run_id, metric_value.quarantine_rate | filter metric_name = "ETLComplete"

How to Inspect Quarantine:

  • Query quarantine/ S3 path: s3://quarantine-bucket/quarantine/mortgages/transactions/ingest_date={YYYY-MM-DD}/run_id={...}/invalid_rows.parquet
  • Check validation_error column for error categories (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR)
  • Check attempt_count column to identify rows approaching max retry limit (≥2)
  • Check row_hash for duplicate detection analysis
  • Review retry_history to understand previous remediation attempts
  • Use Athena or pandas to query quarantine Parquet files
  • Filter by validation_error to see breakdown by error type
  • Filter by attempt_count to identify high-retry items

How to Review Condemned Items:

  • Query quarantine/condemned/ S3 path: s3://quarantine-bucket/quarantine/mortgages/transactions/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet
  • Check validation_error for condemnation reason (DUPLICATE_FAILURE, MAX_ATTEMPTS)
  • Review attempt_count (should be ≥3 for MAX_ATTEMPTS)
  • Check row_hash to identify exact duplicates
  • No automatic retries are performed on condemned items
  • Manual intervention required for any reprocessing

How to Handle Circuit Breaker:

  • Detection: Pipeline halts automatically when >100 same errors occur within 1 hour
  • Immediate Actions:
  • Check CloudWatch logs for circuit breaker trigger details
  • Identify the error type causing the threshold breach
  • Review quarantine data for the problematic error pattern
  • Investigate root cause (systemic data quality issue vs ETL bug)
  • Resolution:
  • If Source Issue: Contact data provider, request corrected data
  • If ETL Issue: Update validation rules, deploy fix, manually restart pipeline
  • If Temporary Spike: Wait for error rate to drop below threshold, manually restart
  • Restart: After root cause is resolved, manually trigger pipeline restart
  • Prevention: Monitor CircuitBreakerTriggers metric and investigate patterns before threshold is reached

Escalation Path:

  • Infrastructure Issues (P1): Data Platform Team — job failures, missing partitions, runtime anomalies, circuit breaker triggers
  • Data Quality Issues (P2): Data Quality Team — quarantine rate spikes (>1%), validation failures, high attempt_count (>1.5 avg), auto-condemnation spikes (>0.5%)
  • Business Metric Issues (P3): Domain Teams / Business — volume anomalies, SLA breaches

Common Operations:

  • Backfill: Trigger reprocessing for specific date range via Step Functions
  • Source Fix: New file with new source_file_idattempt_count resets to 0
  • ETL Fix: Same file with updated rules → attempt_count increments (check less than 3 before reprocessing)
  • Schema Update: Update Glue Data Catalog, create new schema_v, backfill if needed
  • Quarantine Review: Query quarantine Parquet, check attempt_count and row_hash, identify root cause, fix source data or ETL rules, reprocess
  • Condemned Review: Query condemned Parquet, analyze patterns, determine if manual intervention needed
  • Circuit Breaker Recovery: Investigate root cause, fix systemic issue, manually restart pipeline

Code & Infrastructure:

  • ETL Code: tasks/01_data_ingestion_transformation/src/etl/ingest_transactions.py
  • IaC (Terraform): tasks/04_devops_cicd/infra/terraform/
  • CI/CD Workflow: tasks/04_devops_cicd/cicd_workflow.md

Monitoring & Dashboards:

  • CloudWatch Dashboard: Namespace Ohpen/ETL
  • Metrics: InputRows, ValidRows, QuarantinedRows, QuarantineRate, DurationSeconds
  • Logs: CloudWatch Logs Insights (namespace: Ohpen/ETL)

Documentation:

  • Data Lake Architecture: tasks/02_data_lake_architecture_design/architecture.md
  • Validation Rules: tasks/01_data_ingestion_transformation/ASSUMPTIONS_AND_EDGE_CASES.md
  • Schema Evolution: tasks/02_data_lake_architecture_design/architecture.md (Section 4)
  • CI/CD Details: tasks/04_devops_cicd/cicd_workflow.md

9. Assumptions & Known Limitations (case study scope)

  • Batch-first ingestion (streaming is an optional upstream extension, not required here).
  • Negative amounts are allowed (withdrawals/refunds); anomaly detection is out of scope.
  • Loop Prevention: Maximum 3 retry attempts per row (attempt_count limit); exact duplicates in quarantine history are auto-condemned.
  • Circuit Breaker: Pipeline halts automatically when >100 same errors occur within 1 hour window; requires human intervention to restart.
  • Condemned Items: Rows exceeding max attempts or exact duplicates are moved to quarantine/condemned/ with no automatic retries; manual intervention required for reprocessing.
  • TransactionID Deduplication: Checks Silver layer for existing TransactionID + event_date combinations to prevent duplicate processing across runs (optional feature).
  • Duplicate Detection: Checks quarantine history for exact row_hash matches (not business-key deduplication within processed data).
  • Config separation: we ship a config.yaml template (code currently uses CLI args/env vars).