Technical One-Pager: Financial Transaction Pipeline¶
Important Note on Scope¶
⚠️ Scope Disclaimer: This technical summary is based on the case study requirements and my interpretation of the problem. I have made assumptions that may underestimate the current scope of operations at Ohpen. The solution presented here should be viewed as a starting point that would require refinement based on actual production requirements, volumes, compliance needs, and operational constraints.
1. Overview¶
This pipeline ingests raw transaction CSVs, validates financial integrity, and publishes optimized Parquet datasets for analytics. It is designed for auditability, reproducibility, and safe schema evolution.
2. Architecture & Data Flow¶
Raw (S3) -> Metadata Enrichment -> Loop Prevention -> ETL (Python/Glue) -> Processed/Quarantine/Condemned (Parquet) -> Athena (SQL)
S3: raw/transactions/
ingest_date=YYYY-MM-DD/")] end subgraph Enrichment["🏷️ Metadata Enrichment"] MetaAdd[Add Tracking Metadata:
• row_hash SHA256
• source_file_id
• attempt_count
• ingestion_timestamp] end subgraph LoopPrevention["🛡️ Loop Prevention"] DupCheck[Duplicate Detection
Check quarantine history] AttemptLimit[Attempt Limit Check
Max 3 retries per row] CircuitBreaker[Circuit Breaker
> 100 same errors/hour
→ halt pipeline] end subgraph Processing["⚙️ ETL Processing (AWS Glue)"] Validate[Schema Validation
• Required columns
• Data types
• ISO-4217 currency] Transform[Transform & Partition
• Parse timestamps
• Partition by year/month
• Add metadata] end subgraph Storage["💾 Storage Layer"] Processed[("Processed Parquet
Snappy compressed
run_id isolated")] Quarantine[("Quarantine
Invalid rows
+ error details
+ retry tracking")] Condemned[("Condemned
Max attempts exceeded
No more retries")] end subgraph Catalog["📚 Data Catalog"] Glue[(AWS Glue
Data Catalog)] end subgraph Query["🔍 Query Layer"] Athena[Amazon Athena
SQL Interface] end subgraph Consumers["👥 Consumers"] Users[Analysts & Applications] end Raw --> MetaAdd MetaAdd --> DupCheck DupCheck -->|Not duplicate| AttemptLimit DupCheck -->|Duplicate| Condemned AttemptLimit -->|< 3 attempts| Validate AttemptLimit -->|≥ 3 attempts| Condemned Validate --> CircuitBreaker CircuitBreaker -->|Normal| Transform CircuitBreaker -->|Threshold exceeded| HaltPipeline[🚨 Halt Pipeline] Transform -->|Valid| Processed Transform -->|Invalid| Quarantine Quarantine -.->|After review & fix| MetaAdd Processed --> Glue Glue --> Athena Athena --> Users style Ingestion fill:#f57c00,color:#fff style Enrichment fill:#7b1fa2,color:#fff style LoopPrevention fill:#fbc02d,color:#111 style Processing fill:#7b1fa2,color:#fff style Storage fill:#388e3c,color:#fff style Catalog fill:#424242,color:#fff style Query fill:#1976d2,color:#fff style Consumers fill:#1976d2,color:#fff style Condemned fill:#d32f2f,color:#fff style HaltPipeline fill:#d32f2f,color:#fff
- Ingestion: Immutable CSVs land in
raw/transactions/ingest_date=YYYY-MM-DD/. - Metadata Enrichment: Each row is enriched with tracking metadata:
row_hash(SHA256): Unique identifier for deduplicationsource_file_id: Source file identifierattempt_count: Number of processing attempts (starts at 0)ingestion_timestamp: When the row was first ingested
- Loop Prevention:
- TransactionID Deduplication: Checks Silver layer for existing TransactionID + event_date combinations; auto-condemns if found (optional, enabled via
--silver-bucketand--silver-prefix) - Duplicate Detection: Checks quarantine history for exact duplicates (same
row_hash); auto-condemns if found - Attempt Limit: Maximum 3 retry attempts per row; auto-condemns if exceeded
- Circuit Breaker: Halts pipeline if >100 same errors occur within 1 hour (requires human intervention)
- TransactionID Deduplication: Checks Silver layer for existing TransactionID + event_date combinations; auto-condemns if found (optional, enabled via
- Transformation:
- Validation: Schema/required-column checks, null checks, numeric amount type check, ISO-4217 currency validation, timestamp parsing.
- Quarantine: Invalid rows are isolated in
quarantine/with error details,attempt_count, and retry history; never dropped silently. - Partitioning: Output partitioned by
year/monthfor query performance.
- Storage:
- Format: Snappy-compressed Parquet (currently Parquet-only; Iceberg is a future enhancement).
- Idempotency: Each run writes to a unique
run_idpath to prevent data corruption during retries. - Metadata:
run_idandingest_timeare emitted in_SUCCESS; row-level quarantine includesvalidation_error,row_hash,attempt_count, and source context. - Human Approval: For financial data compliance, human approval is required before promoting Silver layer data to production consumption (see
HUMAN_VALIDATION_POLICY.md). - Condemned Layer: Rows exceeding max attempts or exact duplicates are moved to
quarantine/condemned/for permanent retention (no automatic retries).
2.1. Data Lake Folder Structure¶
The data lake follows a Bronze/Silver/Gold medallion architecture with explicit folder organization:
s3://data-lake-bucket/
├── bronze/ # Bronze Layer (Raw, Immutable)
│ └── mortgages/
│ └── transactions/
│ └── ingest_date=2026-01-21/ # Partition by arrival time
│ └── run_id=20260121T120000Z/ # Run isolation for idempotency
│ └── file.csv.gz # Original source file (verbatim copy)
│
├── silver/ # Silver Layer (Validated & Enriched)
│ └── mortgages/
│ └── transactions/
│ └── year=2026/ # Business-time partition (optimized for queries)
│ └── month=01/
│ └── schema_v=v1/ # Schema version for evolution
│ └── run_id=20260121T120000Z/ # Write-once run isolation
│ ├── _SUCCESS # Atomic completion marker
│ └── part-00000.parquet # Optimized storage format
│
├── quarantine/ # Invalid Data (Audit Trail)
│ └── mortgages/
│ └── transactions/
│ └── ingest_date=2026-01-21/
│ └── run_id=20260121T120000Z/
│ └── invalid_rows.parquet # Contains original data + validation_error column
│ └── condemned/ # Condemned Data (No More Retries)
│ └── ingest_date=2026-01-21/
│ └── run_id=20260121T120000Z/
│ └── condemned_rows.parquet # Max attempts exceeded or exact duplicates
│
└── gold/ # Gold Layer (Business Views)
└── finance/ # Task 2: Structure Design (folder organization, governance)
└── account_balances/ # Task 3: SQL Aggregation Pattern (shows how to create Gold data)
└── schema_v=v1/ # Task 1: Does not write Gold files (only Bronze → Silver)
├── _LATEST.json # Authority pointer to current run
├── run_id=20260121T120000Z/ # Historical run
│ └── as_of_month=2024-03/
│ └── part-00000.parquet
└── current/ # Stable prefix for SQL access
└── as_of_month=2024-03/
└── part-00000.parquet Key Path Patterns:
- Bronze:
bronze/{domain}/{dataset}/ingest_date={YYYY-MM-DD}/run_id={...}/file.csv.gz - Silver:
silver/{domain}/{dataset}/year={YYYY}/month={MM}/schema_v={vN}/run_id={...}/part-*.parquet - Quarantine:
quarantine/{domain}/{dataset}/ingest_date={YYYY-MM-DD}/run_id={...}/invalid_rows.parquet - Condemned:
quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet - Gold:
gold/{domain}/{dataset}/schema_v={vN}/run_id={...}/as_of_month={YYYY-MM}/part-*.parquet(Task 2: Structure Design, Task 3: SQL Aggregation Pattern) - Gold (current):
gold/{domain}/{dataset}/schema_v={vN}/current/{partition}/part-*.parquet(Task 2: Structure Design, Task 3: SQL Aggregation Pattern)
Note: Gold layer structure is designed in Task 2 (Architecture - folder organization, governance). Task 3 (SQL) demonstrates how to aggregate Silver → Gold via SQL queries (shows how to create Gold data). Task 1 (ETL Pipeline) does not write Gold layer files - it only implements Bronze → Silver transformation.
Condemned Retention Policy:
- Condemned rows are retained for compliance/audit purposes (7 years minimum)
- No automatic retries are performed on condemned items
- Manual review and intervention required for any reprocessing
- Human approval required before deletion (see
HUMAN_VALIDATION_POLICY.mdfor approval workflow)
2.2. Data Contract¶
Required Fields:
TransactionID(string): Unique transaction identifierCustomerID(string): Customer identifierTransactionAmount(numeric): Transaction amount (negative values allowed for withdrawals/refunds)Currency(string): ISO-4217 currency code (e.g., EUR, USD, GBP)TransactionTimestamp(timestamp): Transaction timestamp (ISO-8601 format)
Partition Keys (derived from TransactionTimestamp):
year: Year extracted from TransactionTimestampmonth: Month extracted from TransactionTimestamp
Output Schema:
- All input fields preserved
- Partition columns added:
year,month - Metadata columns:
row_hash(SHA256),source_file_id,attempt_count,ingestion_timestamp,ingest_time,run_id - Quarantine columns: All above +
validation_error,retry_history
2.3. Validation & Quality Controls¶
+ row_hash SHA256
+ source_file_id
+ attempt_count] Enrich --> TxIDCheck{TransactionID
in Silver
Layer?} TxIDCheck -->|Yes| Condemned TxIDCheck -->|No| DupCheck{Exact duplicate
in quarantine
history?} DupCheck -->|Yes| AutoCondemn1[Auto-Condemn:
DUPLICATE_FAILURE] DupCheck -->|No| AttemptCheck{attempt_count
≥ 3?} AttemptCheck -->|Yes| AutoCondemn2[Auto-Condemn:
MAX_ATTEMPTS] AttemptCheck -->|No| SchemaCheck{Schema Valid?
Required columns} SchemaCheck -->|No| Q1[Quarantine:
SCHEMA_ERROR
attempt_count++] SchemaCheck -->|Yes| NullCheck{Null Checks
Critical fields} NullCheck -->|Failed| Q2[Quarantine:
NULL_VALUE_ERROR
attempt_count++] NullCheck -->|Passed| TypeCheck{Numeric Amount?
Valid data type} TypeCheck -->|No| Q3[Quarantine:
TYPE_ERROR
attempt_count++] TypeCheck -->|Yes| CurrencyCheck{ISO-4217
Currency Code?} CurrencyCheck -->|Invalid| Q4[Quarantine:
CURRENCY_ERROR
attempt_count++] CurrencyCheck -->|Valid| TimestampCheck{Timestamp
Parseable?} TimestampCheck -->|Failed| Q5[Quarantine:
TIMESTAMP_ERROR
attempt_count++] TimestampCheck -->|Passed| Success[✓ Write to
Processed Parquet] Q1 --> CircuitBreaker{Same error
> 100 rows
in 1 hour?} Q2 --> CircuitBreaker Q3 --> CircuitBreaker Q4 --> CircuitBreaker Q5 --> CircuitBreaker CircuitBreaker -->|Yes| HaltPipeline[🚨 HALT PIPELINE
Circuit breaker triggered
Human intervention required] CircuitBreaker -->|No| QuarantineStore QuarantineStore[("Quarantine Storage
+ error details
+ row_hash
+ attempt_count
+ retry_history
+ timestamp")] Success --> Processed[("Processed Layer
year/month partitions
run_id isolated")] QuarantineStore --> Aging{Age Check} Aging -->|< 7 days| Monitor[Monitor] Aging -->|7-30 days| Review[Human Review
& Decision] Aging -->|> 30 days| Review Review -->|Source Fix
NEW file_id| SourceFix[Source Fix
attempt_count reset to 0] Review -->|ETL Fix
SAME file_id| ETLFix[ETL Fix
attempt_count preserved] Review -->|Condemn| Condemned AutoCondemn1 --> Condemned[("Condemned
quarantine/condemned/")] AutoCondemn2 --> Condemned SourceFix -.->|Re-ingest as new| Start ETLFix -.->|Reprocess| DupCheck Monitor -.-> Aging style Start fill:#f57c00,color:#fff style Success fill:#2e7d32,color:#fff style Processed fill:#388e3c,color:#fff style Q1 fill:#d32f2f,color:#fff style Q2 fill:#d32f2f,color:#fff style Q3 fill:#d32f2f,color:#fff style Q4 fill:#d32f2f,color:#fff style Q5 fill:#d32f2f,color:#fff style QuarantineStore fill:#d32f2f,color:#fff style Review fill:#ffa000,color:#111 style Condemned fill:#d32f2f,color:#fff style SourceFix fill:#2e7d32,color:#fff style ETLFix fill:#7b1fa2,color:#fff style DupCheck fill:#fbc02d,color:#111 style AttemptCheck fill:#fbc02d,color:#111 style CircuitBreaker fill:#ffa000,color:#111 style HaltPipeline fill:#d32f2f,color:#fff style AutoCondemn1 fill:#d32f2f,color:#fff style AutoCondemn2 fill:#d32f2f,color:#fff
Pre-Validation Checks (Loop Prevention):
- Metadata Enrichment → All rows receive tracking metadata
row_hash(SHA256): Computed from row content for deduplicationsource_file_id: Identifier of the source fileattempt_count: Starts at 0, increments on each retry-
ingestion_timestamp: First ingestion time -
Duplicate Detection → Auto-condemn if exact duplicate found
- Checks: Quarantine history for exact
row_hashmatch - Failure action: Auto-condemn to
quarantine/condemned/withvalidation_error = "DUPLICATE_FAILURE" -
Purpose: Prevents infinite retry loops on identical bad data
-
Attempt Limit Check → Auto-condemn if max attempts exceeded
- Checks:
attempt_count ≥ 3 - Failure action: Auto-condemn to
quarantine/condemned/withvalidation_error = "MAX_ATTEMPTS" - Purpose: Prevents infinite retry loops on persistently failing rows
Validation Rules (applied in order after pre-validation):
- Schema Validation → Quarantine if schema invalid
- Checks: Required columns present, data types valid
-
Failure action: Row quarantined with
validation_error = "SCHEMA_ERROR",attempt_count++ -
Null Checks → Quarantine if missing required fields
- Checks: All required fields (
TransactionID,CustomerID,TransactionAmount,Currency,TransactionTimestamp) must be non-null -
Failure action: Row quarantined with
validation_error = "NULL_VALUE_ERROR",attempt_count++ -
Amount Type Check → Quarantine if non-numeric
- Checks: TransactionAmount must be parseable as numeric
- Failure action: Row quarantined with
validation_error = "TYPE_ERROR",attempt_count++ -
Note: Negative amounts are allowed (withdrawals/refunds)
-
Currency Validation → Quarantine if not ISO-4217
- Checks: Currency code must be in allowed list (ISO-4217 standard)
-
Failure action: Row quarantined with
validation_error = "CURRENCY_ERROR",attempt_count++ -
Timestamp Parsing → Quarantine if unparseable
- Checks: TransactionTimestamp must be parseable as ISO-8601 timestamp
- Failure action: Row quarantined with
validation_error = "TIMESTAMP_ERROR",attempt_count++
Post-Quarantine Circuit Breaker:
- Circuit Breaker Check: After each quarantine action, check if >100 rows with the same error type occurred within the last hour
- Threshold Exceeded: Pipeline halts automatically, requires human intervention
- Purpose: Prevents processing storms from systemic data quality issues
What Happens on Failure:
- Pre-validation failures (duplicates, max attempts) → Auto-condemn to
quarantine/condemned/ - Validation failures → Quarantine with
attempt_countincremented - Never silently dropped — all invalid rows preserved for audit and review
- Quarantine includes: original data +
validation_errorcolumn + metadata (row_hash,attempt_count,retry_history,ingest_date,run_id,source_file) - Valid rows proceed to Silver layer (validated Parquet)
Quarantine Aging & Review:
- < 7 days: Automated monitoring (no human action required)
- 7-30 days: Flagged for human review and decision
- > 30 days: Escalated for review (potential systemic issue)
- Resolution paths: Source Fix (new file,
attempt_countresets), ETL Fix (same file,attempt_countpreserved), or Condemn
3. Reprocessing & Backfills¶
We assume an append-only raw layer. The backfill and reprocessing workflow includes loop prevention checks to avoid infinite retry cycles.
(Immutable) participant Quar as Quarantine participant Meta as Metadata Store
(row_hash tracking) participant Proc as Processed Layer participant Cat as Glue Catalog participant Users as Downstream Users Note over Ops,Users: Scenario 1: Late-arriving data (clean ingestion) Ops->>ETL: Trigger backfill for
ingest_date=2025-01-15 ETL->>Raw: Read from
raw/transactions/
ingest_date=2025-01-15/ ETL->>ETL: Generate row_hash
Set attempt_count=0 Note over Raw: Original data unchanged ETL->>Proc: Write to NEW run_id:
run_id=20250128_BACKFILL ETL->>Cat: Update partition metadata Cat-->>Users: Updated data available Note over Ops,Users: Scenario 2: Quarantine remediation with loop checks Ops->>Quar: Review quarantined items
Age > 7 days Quar->>Meta: Check retry history
for each row_hash alt Source Provider Fix (New File) Note over Meta: NEW source_file_id
→ attempt_count RESETS to 0 Ops->>Raw: Corrected CSV from provider
source_file_id=corrected_v2.csv Raw->>ETL: Ingest new file ETL->>Meta: New row_hash computed
attempt_count=0 ETL->>Proc: Write with
run_id=20250128_SOURCE_FIX ETL->>Quar: Mark original rows
as superseded else ETL Logic Fix (Same File) Note over Meta: SAME source_file_id
→ attempt_count INCREMENTS Meta->>Meta: Check: attempt_count < 3? alt Attempts < 3 Ops->>ETL: Deploy updated
validation rules ETL->>Raw: Reprocess original data ETL->>Meta: Increment attempt_count
Log retry_history ETL->>Proc: Write with
run_id=20250128_ETL_FIX ETL->>Quar: Update status else Attempts ≥ 3 Meta->>Quar: Auto-condemn:
MAX_ATTEMPTS Note over Quar: Row moved to
quarantine/condemned/ Quar-->>Ops: Alert: Row condemned
No more auto-retries end else Circuit Breaker Triggered ETL->>ETL: Detect: >100 same errors
in 1 hour window ETL->>Ops: 🚨 HALT PIPELINE
Circuit breaker activated Note over ETL: Pipeline stopped
Human intervention required Ops->>ETL: Investigate root cause
Fix systemic issue Ops->>ETL: Manual restart after fix end ETL->>Cat: Update catalog Cat-->>Users: Previously quarantined
data now available Note over Quar: Auto-condemned items (>90d or >3 attempts)
→ quarantine/condemned/
Backfill Process:
- Trigger Backfill: AWS Step Functions or EventBridge triggers the AWS Glue job for the specific
rawpartition. - Metadata Generation: Each row receives
row_hash(SHA256) andattempt_count=0for new ingestions. - New Version: Output writes to a new
run_idfolder (e.g.,processed/.../run_id=20260128_BACKFILL). - Publish: Update the AWS Glue Data Catalog to point to the new
run_id(currently Parquet-only; Iceberg would provide automatic partition snapshots as a future enhancement).
Quarantine Remediation Scenarios¶
Scenario A: Source Provider Fix (New File)¶
- Trigger: Data provider sends corrected CSV file with new
source_file_id - Behavior:
attempt_countresets to 0 (treated as new data) - Process:
- New file ingested to
raw/with newsource_file_id - New
row_hashcomputed (may differ if data was corrected) - Original quarantined rows marked as superseded
- New run writes to
run_id=YYYYMMDD_SOURCE_FIX
Scenario B: ETL Logic Fix (Same File)¶
- Trigger: ETL validation rules updated to handle previously invalid data
- Behavior:
attempt_countincrements (preserves retry history) - Process:
- Check
attempt_count < 3before reprocessing - If < 3: Reprocess original data with updated rules, increment
attempt_count, log toretry_history - If ≥ 3: Auto-condemn to
quarantine/condemned/(no automatic retry) - New run writes to
run_id=YYYYMMDD_ETL_FIX
Scenario C: Circuit Breaker Triggered¶
- Trigger: >100 rows with same error type within 1 hour window
- Behavior: Pipeline halts automatically, requires human intervention
- Process:
- Pipeline stops processing immediately
- Alert sent to operations team (PagerDuty critical)
- Root cause investigation required
- Manual restart after systemic issue is resolved
Loop Prevention Guarantees:
- Duplicate Detection: Exact
row_hashmatches in quarantine history → Auto-condemn - Attempt Limit: Maximum 3 retries per row → Auto-condemn if exceeded
- Circuit Breaker: Prevents processing storms from systemic issues
- Metadata Tracking:
row_hash,source_file_id,attempt_count,retry_historyenable full audit trail
3.1. Schema Evolution Strategy¶
Compatibility Policy:
- Backward-compatible reads: Updated consumers can read both old and new data
- Forward-compatible writes: Updated producers may emit additional fields without breaking older consumers
- Additive changes only: New columns must be nullable/optional
Schema Versioning:
- All Silver/Gold paths include
schema_v={version}(e.g.,schema_v=v1,schema_v=v2) - Enables schema evolution without breaking consumers
- Backward compatibility maintained — old data remains accessible
Allowed Changes (Safe / Zero-Downtime):
- Add new nullable/optional field (e.g.,
TransactionType) - Deprecate before remove: keep field, stop populating, document replacement
- Add new derived views instead of changing semantics
Breaking Changes (Require Playbook):
- Type changes require new schema version and backfill
- Renames require transition period with aliases
- Semantic changes require versioned v2 dataset
Recommended Approach:
- Current: Parquet-only with schema versioning (
schema_v=v1,schema_v=v2) - Future Enhancement: Apache Iceberg tables via AWS Glue for advanced schema evolution (add/rename/reorder columns without rewriting files)
- Fallback: Parquet-only with additive changes and schema registry in Glue Data Catalog
See Task 2 architecture documentation for complete schema evolution strategy.
4. Operational Monitoring¶
• Rows ingested
• Rows processed
• Rows quarantined
• Rows condemned] Quality[Quality Metrics
• Quarantine rate %
• Error type distribution
• Validation pass rate
• Retry attempt distribution] Latency[Latency Metrics
• ETL duration
• End-to-end time
• Partition lag] LoopMetrics[Loop Prevention Metrics
• Avg attempt_count
• Duplicate detection rate
• Circuit breaker triggers
• Auto-condemnation rate] end subgraph Thresholds["⚠️ Alert Thresholds"] T1[Quarantine Rate > 1%] T2[Job Failure] T3[Processing Time > 2x avg] T4[Zero rows processed] T5[Circuit Breaker Triggered] T6[Avg attempt_count > 1.5] T7[Auto-condemn rate > 0.5%] end subgraph Actions["🚨 Alert Actions"] PD[PagerDuty Alert
On-call engineer] Slack[Slack Notification
Data team channel] Log[CloudWatch Logs
Detailed diagnostics] AutoHalt[Automatic Pipeline Halt
Circuit breaker engaged] end subgraph Dashboards["📈 Visualization"] CW[CloudWatch Dashboard
Real-time metrics
+ retry trends] Grafana[Grafana
Historical trends
+ loop detection] Report[Daily Summary Report
Email digest
+ condemned items] end Volume --> T1 Volume --> T4 Quality --> T1 Latency --> T3 LoopMetrics --> T5 LoopMetrics --> T6 LoopMetrics --> T7 T1 -->|Critical| PD T2 -->|Critical| PD T3 -->|Warning| Slack T4 -->|Critical| PD T5 -->|Critical| AutoHalt T5 -->|Critical| PD T6 -->|Warning| Slack T7 -->|Warning| Slack T1 --> Log T2 --> Log T3 --> Log T5 --> Log Volume --> CW Quality --> CW Latency --> CW LoopMetrics --> CW CW --> Grafana CW --> Report style Metrics fill:#00897b,color:#fff style Thresholds fill:#ffa000,color:#111 style Actions fill:#d32f2f,color:#fff style Dashboards fill:#00897b,color:#fff style LoopMetrics fill:#7b1fa2,color:#fff style T5 fill:#d32f2f,color:#fff style AutoHalt fill:#d32f2f,color:#fff
CloudWatch Metrics & Logging¶
The pipeline publishes custom metrics to CloudWatch (namespace: Ohpen/ETL):
Volume Metrics:
InputRows: Total rows read from raw layerValidRows: Rows successfully written to processed layerQuarantinedRows: Rows sent to quarantineCondemnedRows: Rows auto-condemned (duplicates or max attempts)
Quality Metrics:
QuarantineRate: Percentage of rows quarantinedErrorTypeDistribution: Breakdown by error type (SCHEMA_ERROR, NULL_VALUE_ERROR, etc.)ValidationPassRate: Percentage of rows passing all validationsRetryAttemptDistribution: Distribution ofattempt_countvalues
Performance Metrics:
DurationSeconds: ETL execution timeEndToEndTime: Total time from ingestion to availabilityPartitionLag: Delay in partition availability
Loop Prevention Metrics:
AvgAttemptCount: Averageattempt_countacross all processed rowsDuplicateDetectionRate: Percentage of rows flagged as duplicatesCircuitBreakerTriggers: Count of circuit breaker activationsAutoCondemnationRate: Percentage of rows auto-condemned
All logs are structured JSON for CloudWatch Logs Insights, enabling queries like:
fields @timestamp, run_id, metric_value.quarantine_rate
| filter metric_name = "ETLComplete"
Alert Thresholds & Actions¶
Critical Alerts (PagerDuty + Auto-halt):
- T1: Quarantine Rate > 1% → Data Quality Team (P2 - 4 hours)
- T2: Job Failure → Data Platform Team (P1 - Immediate)
- T4: Zero rows processed → Data Platform Team (P1 - Immediate)
- T5: Circuit Breaker Triggered → Automatic pipeline halt + Data Platform Team (P1 - Immediate)
Warning Alerts (Slack Notification):
- T3: Processing Time > 2x average → Data Platform Team (P3 - 8 hours)
- T6: Avg attempt_count > 1.5 → Data Quality Team (P2 - 4 hours)
- T7: Auto-condemn rate > 0.5% → Data Quality Team (P2 - 4 hours)
Dashboards & Visualization¶
- CloudWatch Dashboard: Real-time metrics, retry trends, circuit breaker status
- Grafana: Historical trends, loop detection patterns, long-term quality trends
- Daily Summary Report: Email digest with condemned items, quarantine summary, key metrics
Alert Ownership & Escalation¶
Alerts are routed to appropriate teams based on alert type:
- Infrastructure Alerts (Data Platform Team): Job failures, missing partitions, runtime anomalies, circuit breaker triggers
- Data Quality Alerts (Data Quality Team): Quarantine rate spikes (
> 1%), validation failures, highattempt_count, auto-condemnation spikes - Business Metric Alerts (Domain Teams / Business): Volume anomalies, SLA breaches
See Task 4 CI/CD documentation (Section 5.2) for complete alert ownership matrix and escalation paths.
4.2. Quarantine Lifecycle Management¶
Row quarantined with:
• Error type
• Source context
• Timestamp] end subgraph Triage["🔍 Daily Triage (Automated)"] Age{Quarantine Age} ErrorType{Error Type
Classification} end subgraph Review["👥 Human Review"] DataTeam[Data Team Review
• Analyze error patterns
• Identify root cause
• Determine action] Decision{Resolution
Decision} end subgraph Actions["⚙️ Resolution Actions"] SourceFix[Source System Fix
• Notify data provider
• Request corrected file
• Re-ingest to raw/] ETLFix[ETL Logic Fix
• Update validation rules
• Backfill from raw/
• Reprocess partition] ManualFix[Manual Correction
• Create corrected CSV
• Inject into raw/
• Mark for reprocessing] end subgraph Reprocess["♻️ Reprocessing"] Revalidate[Re-run Validation
Against fixed data
or updated rules] PassCheck{Validation
Passed?} end subgraph Terminal["⚰️ Permanent Disposition"] Condemn[Condemned
Move to:
quarantine/condemned/
retention_date] Success[Successfully
Reprocessed
Move to processed/] end Failed --> Age Age -->|< 7 days| ErrorType Age -->|7-30 days| Review Age -->|> 30 days| AutoCondemn[Auto-flag for
condemnation review] ErrorType -->|Fixable
Schema/Type| Review ErrorType -->|Systemic
Pattern| Review ErrorType -->|One-off| Monitor[Monitor until
7-day threshold] AutoCondemn --> Review Monitor --> Age DataTeam --> Decision Decision -->|Upstream
Issue| SourceFix Decision -->|Our Bug| ETLFix Decision -->|Data Entry
Error| ManualFix Decision -->|Irreparable| Condemn SourceFix --> Revalidate ETLFix --> Revalidate ManualFix --> Revalidate Revalidate --> PassCheck PassCheck -->|Yes| Success PassCheck -->|No| Review Condemn -.->|After retention
period| Delete[Permanent Deletion
Per compliance policy] style Failed fill:#d32f2f,color:#fff style Review fill:#ffa000,color:#111 style Success fill:#2e7d32,color:#fff style Condemn fill:#d32f2f,color:#fff style Revalidate fill:#7b1fa2,color:#fff
Quarantine Lifecycle Stages:
-
Quarantine Entry: Row fails validation and is quarantined with error details,
row_hash,attempt_count, and source context. -
Daily Triage (Automated):
- Age < 7 days: Automated monitoring (no human action)
- Age 7-30 days: Flagged for human review
-
Age > 30 days: Auto-flagged for condemnation review (potential systemic issue)
-
Human Review: Data team analyzes error patterns, identifies root cause, and determines resolution action.
-
Resolution Actions:
- Source System Fix: Notify provider, request corrected file, re-ingest (new
source_file_id,attempt_countresets) - ETL Logic Fix: Update validation rules, backfill from raw (same
source_file_id,attempt_countincrements) -
Manual Correction: Create corrected CSV, inject into raw, mark for reprocessing
-
Reprocessing: Re-run validation against fixed data or updated rules.
-
Terminal Disposition:
- Success: Row successfully reprocessed, moved to processed layer
- Condemned: Moved to
quarantine/condemned/for permanent retention (no automatic retries)
Retention Policy: Condemned items retained per compliance policy (typically 7 years), then permanently deleted.
4.1. CI/CD + Deployment¶
CI Pipeline (GitHub Actions):
- Validation: Runs on every Pull Request
rufflinting (code style)pytestunit tests (partition logic, null handling, quarantine checks)- Artifact Build: Packages Python ETL code, tags with Git SHA (e.g.,
etl-v1.0.0-a1b2c3d.zip) - Deployment (CD):
- Uploads artifact to S3 (Code Bucket)
- Terraform
plan&applyto update AWS infrastructure (Glue Jobs, IAM, Buckets) - Updates Glue Job to point to new artifact
Backfill Safety Checks:
- Determinism: Rerunning same input produces exact same counts
- Partitioning: Timestamps map to correct
year=YYYY/month=MMfolder - Quarantine: Invalid rows never silently dropped
Failure Handling:
- Failed runs do not update
_LATEST.jsonorcurrent/prefix - Each rerun uses new
run_id(timestamp-based) - Previous failed runs remain in storage (audit trail)
See Task 4 CI/CD documentation for complete workflow details.
5. Cost & Scalability¶
S3 Infrequent Access
$0.0125/GB/month] Processed[Processed Parquet
S3 Standard
Snappy compression
~70% reduction] Archive[Quarantine
S3 Glacier
Long-term retention] end subgraph Compute["⚡ Compute Costs"] Glue[AWS Glue Serverless
$0.44/DPU-hour
Auto-scaling] NoServers[No infrastructure
management
Pay per job] end subgraph Query["🔍 Query Costs"] Partition[Partitioning Strategy
year/month reduces
data scanned] Athena[Athena Pricing
$5 per TB scanned] Savings[Typical query scans
1-5% of total data
due to partitions] end subgraph Scale["📈 Scalability"] Horizontal[Horizontal Scaling
Glue DPUs scale
with data volume] S3Scale[S3 unlimited
storage capacity] NoBottleneck[No single point
of failure] end Raw -.->|Compression| Processed Processed -.->|Archival| Archive Glue -.->|Reads| Raw Glue -.->|Writes| Processed Partition -.->|Optimizes| Athena Athena -.->|Queries| Processed Horizontal -.->|Supports| Glue S3Scale -.->|Backs| Storage style Storage fill:#388e3c,color:#fff style Compute fill:#ffa000,color:#111 style Query fill:#1976d2,color:#fff style Scale fill:#7b1fa2,color:#fff
Storage Costs:
- Raw Layer: S3 Infrequent Access ($0.0125/GB/month) for immutable source data
- Processed Layer: S3 Standard with Snappy compression (~70% size reduction vs CSV)
- Quarantine/Condemned: S3 Glacier for long-term retention (compliance/audit)
Compute Costs:
- AWS Glue Serverless: $0.44/DPU-hour, auto-scales with data volume
- No Infrastructure Management: Pay per job execution, no idle costs
- Horizontal Scaling: Glue DPUs scale automatically with data volume
Query Costs:
- Amazon Athena: $5 per TB scanned
- Partitioning Strategy:
year/monthpartitioning reduces typical query scans to 1-5% of total data - Cost Optimization: Partition pruning minimizes scan costs significantly
Scalability:
- Horizontal Scaling: Glue DPUs scale automatically with data volume
- S3 Unlimited Capacity: No storage bottlenecks
- No Single Point of Failure: Distributed architecture ensures high availability
6. Security & Governance (production expectations)¶
All API calls
HTTPS only] Rest[S3 Encryption at Rest
SSE-S3 or SSE-KMS
Automatic encryption] end subgraph Access["👤 Access Control"] IAM[IAM Least Privilege
Separate roles for:
• ETL writers
• Analyst readers
• Admin operators] Raw_Access[Raw Layer
Restricted access
Audit logged] Quarantine_Access[Quarantine
Limited visibility
Data team only] end subgraph PII["🔒 PII Protection"] Minimize[Minimize PII
in raw layer
where possible] Mask[Masking/Tokenization
in curated layers
if required] Retention[Data retention
policies per
compliance needs] end subgraph Audit["📝 Auditability"] Immutable[Immutable raw layer
Complete history] RunID[run_id isolation
Version tracking] Success[_SUCCESS metadata
Job traceability] Logs[CloudTrail & CloudWatch
Complete audit trail
Catalog changes logged] end Transit -.-> Rest IAM -.-> Raw_Access IAM -.-> Quarantine_Access Minimize -.-> Mask Mask -.-> Retention Immutable -.-> RunID RunID -.-> Success Success -.-> Logs style Encryption fill:#6a1b9a,color:#fff style Access fill:#6a1b9a,color:#fff style PII fill:#6a1b9a,color:#fff style Audit fill:#757575,color:#fff
Data Ownership Model¶
Ownership shifts from technical → business as data moves downstream:
- Bronze Layer: Data Platform Team (immutability, ingestion reliability) - ✅ Implemented (Task 1)
- Silver Layer: Domain Teams (validation rules, schema evolution) - ✅ Implemented (Task 1)
- Gold Layer: Business/Finance (metric definitions, reporting accuracy) - Task 2: Structure Design, Task 3: SQL Aggregation
Access Control & Permissions¶
- Encryption: TLS in transit; S3 encryption at rest (SSE-S3 or SSE-KMS)
- IAM Roles: Least privilege per layer (Platform: RW on Bronze/Silver, Domain: RW on Silver, Business: R on Gold)
-
Prefix-Scoped Permissions: IAM policies are scoped to S3 prefixes for fine-grained access control:
-
Platform Team:
s3://bucket/bronze/*,s3://bucket/silver/*,s3://bucket/quarantine/* - Domain Teams:
s3://bucket/silver/{domain}/*(write),s3://bucket/gold/{domain}/*(read) - Business/Analysts:
s3://bucket/gold/*(read-only) - Compliance:
s3://bucket/bronze/*,s3://bucket/quarantine/*(read-only for audit) - Restricted Access:
raw/andquarantine/restricted to Platform and Compliance teams - PII Handling: Keep PII minimal in
raw/where possible; mask/tokenize in curated layers if required
Governance & Auditability¶
- Immutability: Bronze layer is immutable (append-only, no overwrites)
- Run Isolation: Each run writes to unique
run_idpath for audit trail - Metadata:
_SUCCESSmarkers include run metrics; CloudWatch logs capture full lineage - Schema Versioning: All schema changes versioned via
schema_vfor backward compatibility - Change Approval: Schema changes require Domain/Business approval + Platform implementation
For complete governance model, see Task 4 CI/CD documentation (Section 5: Ownership & Governance).
7. Runbook¶
How to Rerun:
- Trigger via AWS Step Functions or EventBridge with specific
rawpartition - Specify
ingest_datepartition to reprocess (e.g.,ingest_date=2026-01-21) - New run writes to unique
run_idpath (e.g.,run_id=20260128_BACKFILL) - After validation, update
_LATEST.jsonandcurrent/prefix to point to new run
Where Logs Are:
- CloudWatch Logs:
/aws-glue/jobs/output(namespace:Ohpen/ETL) - Structured JSON format for CloudWatch Logs Insights queries
- Query example:
fields @timestamp, run_id, metric_value.quarantine_rate | filter metric_name = "ETLComplete"
How to Inspect Quarantine:
- Query
quarantine/S3 path:s3://quarantine-bucket/quarantine/mortgages/transactions/ingest_date={YYYY-MM-DD}/run_id={...}/invalid_rows.parquet - Check
validation_errorcolumn for error categories (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR) - Check
attempt_countcolumn to identify rows approaching max retry limit (≥2) - Check
row_hashfor duplicate detection analysis - Review
retry_historyto understand previous remediation attempts - Use Athena or pandas to query quarantine Parquet files
- Filter by
validation_errorto see breakdown by error type - Filter by
attempt_countto identify high-retry items
How to Review Condemned Items:
- Query
quarantine/condemned/S3 path:s3://quarantine-bucket/quarantine/mortgages/transactions/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/condemned_rows.parquet - Check
validation_errorfor condemnation reason (DUPLICATE_FAILURE, MAX_ATTEMPTS) - Review
attempt_count(should be ≥3 for MAX_ATTEMPTS) - Check
row_hashto identify exact duplicates - No automatic retries are performed on condemned items
- Manual intervention required for any reprocessing
How to Handle Circuit Breaker:
- Detection: Pipeline halts automatically when >100 same errors occur within 1 hour
- Immediate Actions:
- Check CloudWatch logs for circuit breaker trigger details
- Identify the error type causing the threshold breach
- Review quarantine data for the problematic error pattern
- Investigate root cause (systemic data quality issue vs ETL bug)
- Resolution:
- If Source Issue: Contact data provider, request corrected data
- If ETL Issue: Update validation rules, deploy fix, manually restart pipeline
- If Temporary Spike: Wait for error rate to drop below threshold, manually restart
- Restart: After root cause is resolved, manually trigger pipeline restart
- Prevention: Monitor
CircuitBreakerTriggersmetric and investigate patterns before threshold is reached
Escalation Path:
- Infrastructure Issues (P1): Data Platform Team — job failures, missing partitions, runtime anomalies, circuit breaker triggers
- Data Quality Issues (P2): Data Quality Team — quarantine rate spikes (>1%), validation failures, high
attempt_count(>1.5 avg), auto-condemnation spikes (>0.5%) - Business Metric Issues (P3): Domain Teams / Business — volume anomalies, SLA breaches
Common Operations:
- Backfill: Trigger reprocessing for specific date range via Step Functions
- Source Fix: New file with new
source_file_id→attempt_countresets to 0 - ETL Fix: Same file with updated rules →
attempt_countincrements (check less than 3 before reprocessing) - Schema Update: Update Glue Data Catalog, create new
schema_v, backfill if needed - Quarantine Review: Query quarantine Parquet, check
attempt_countandrow_hash, identify root cause, fix source data or ETL rules, reprocess - Condemned Review: Query condemned Parquet, analyze patterns, determine if manual intervention needed
- Circuit Breaker Recovery: Investigate root cause, fix systemic issue, manually restart pipeline
8. Links & References¶
Code & Infrastructure:
- ETL Code:
tasks/01_data_ingestion_transformation/src/etl/ingest_transactions.py - IaC (Terraform):
tasks/04_devops_cicd/infra/terraform/ - CI/CD Workflow:
tasks/04_devops_cicd/cicd_workflow.md
Monitoring & Dashboards:
- CloudWatch Dashboard: Namespace
Ohpen/ETL - Metrics:
InputRows,ValidRows,QuarantinedRows,QuarantineRate,DurationSeconds - Logs: CloudWatch Logs Insights (namespace:
Ohpen/ETL)
Documentation:
- Data Lake Architecture:
tasks/02_data_lake_architecture_design/architecture.md - Validation Rules:
tasks/01_data_ingestion_transformation/ASSUMPTIONS_AND_EDGE_CASES.md - Schema Evolution:
tasks/02_data_lake_architecture_design/architecture.md(Section 4) - CI/CD Details:
tasks/04_devops_cicd/cicd_workflow.md
9. Assumptions & Known Limitations (case study scope)¶
- Batch-first ingestion (streaming is an optional upstream extension, not required here).
- Negative amounts are allowed (withdrawals/refunds); anomaly detection is out of scope.
- Loop Prevention: Maximum 3 retry attempts per row (
attempt_countlimit); exact duplicates in quarantine history are auto-condemned. - Circuit Breaker: Pipeline halts automatically when >100 same errors occur within 1 hour window; requires human intervention to restart.
- Condemned Items: Rows exceeding max attempts or exact duplicates are moved to
quarantine/condemned/with no automatic retries; manual intervention required for reprocessing. - TransactionID Deduplication: Checks Silver layer for existing TransactionID + event_date combinations to prevent duplicate processing across runs (optional feature).
- Duplicate Detection: Checks quarantine history for exact
row_hashmatches (not business-key deduplication within processed data). - Config separation: we ship a
config.yamltemplate (code currently uses CLI args/env vars).