Task 1: ETL Pipeline - Architecture Diagram

High-Level Data Flow

flowchart TD Start([ETL Job Triggered]) --> Init[Initialize Run] Init --> GenID[Generate run_id & ingest_time] GenID --> S3Init[Initialize S3 Client] S3Init --> Read[Read CSV from Bronze Layer] Read -->|s3://raw-bucket/transactions.csv| RawDF[Raw DataFrame] RawDF --> Enrich[Enrich Metadata
row_hash, source_file_id
attempt_count, ingestion_timestamp] Enrich --> LoopPrevent[Loop Prevention] LoopPrevent --> TxIDCheck{TransactionID
in Silver Layer?} TxIDCheck -->|Yes| AutoCondemn0[Auto-Condemn
DUPLICATE_TRANSACTION_ID] TxIDCheck -->|No| DupCheck{Duplicate in
Quarantine History?} DupCheck -->|Yes| AutoCondemn1[Auto-Condemn
DUPLICATE_FAILURE] DupCheck -->|No| AttemptCheck{attempt_count
≥ 3?} AttemptCheck -->|Yes| AutoCondemn2[Auto-Condemn
MAX_ATTEMPTS] AttemptCheck -->|No| Validate[Validate & Transform] AutoCondemn0 --> CondemnedDF[Condemned DataFrame] AutoCondemn1 --> CondemnedDF AutoCondemn2 --> CondemnedDF Validate --> Check1{Schema/Null Check} Check1 -->|Pass| Check2{Currency Check} Check1 -->|Fail| Quarantine1[Mark: NULL_VALUE_ERROR
attempt_count++] Check2 -->|Pass| Check3{Amount Type Check} Check2 -->|Fail| Quarantine2[Mark: CURRENCY_ERROR
attempt_count++] Check3 -->|Pass| Check4{Timestamp Parse} Check3 -->|Fail| Quarantine3[Mark: TYPE_ERROR
attempt_count++] Check4 -->|Pass| Check5{Business Duplicate} Check4 -->|Fail| Quarantine4[Mark: TIMESTAMP_ERROR
attempt_count++] Check5 -->|Pass| ValidDF[Valid DataFrame] Check5 -->|Fail| Quarantine5[Mark: Duplicate account/date
attempt_count++] Quarantine1 --> CircuitBreaker{Circuit Breaker
Check} Quarantine2 --> CircuitBreaker Quarantine3 --> CircuitBreaker Quarantine4 --> CircuitBreaker Quarantine5 --> CircuitBreaker CircuitBreaker -->|Threshold Exceeded| Halt[🚨 HALT PIPELINE
Human Intervention] CircuitBreaker -->|Normal| QuarantineDF[Quarantine DataFrame] Quarantine1 --> QuarantineDF Quarantine2 --> QuarantineDF Quarantine3 --> QuarantineDF Quarantine4 --> QuarantineDF Quarantine5 --> QuarantineDF ValidDF --> AddPartition[Add Partition Columns] AddPartition -->|year, month| PartitionDF[Partitioned DataFrame] PartitionDF --> WriteValid[Write to Silver Layer] WriteValid -->|s3://processed-bucket/.../run_id=.../| SilverS3[(Silver Layer S3)] QuarantineDF --> WriteQuar[Write to Quarantine] WriteQuar -->|s3://quarantine-bucket/.../run_id=.../| QuarantineS3[(Quarantine S3)] CondemnedDF --> WriteCondemned[Write to Condemned] WriteCondemned -->|s3://quarantine-bucket/.../condemned/.../| CondemnedS3[(Condemned S3)] SilverS3 --> Metrics[Calculate Metrics] QuarantineS3 --> Metrics CondemnedS3 --> Metrics Metrics --> SuccessMarker[Write _SUCCESS Marker] SuccessMarker --> CloudWatch[Publish CloudWatch Metrics] CloudWatch --> ApprovalCheck[Human Approval Required
See HUMAN_VALIDATION_POLICY.md] ApprovalCheck --> Complete([ETL Complete]) style Start fill:#f57c00,color:#fff style Init fill:#424242,color:#fff style GenID fill:#424242,color:#fff style S3Init fill:#424242,color:#fff style Read fill:#424242,color:#fff style RawDF fill:#f57c00,color:#fff style Validate fill:#7b1fa2,color:#fff style Check1 fill:#fbc02d,color:#111 style Check2 fill:#fbc02d,color:#111 style Check3 fill:#fbc02d,color:#111 style Check4 fill:#fbc02d,color:#111 style Check5 fill:#fbc02d,color:#111 style Quarantine1 fill:#d32f2f,color:#fff style Quarantine2 fill:#d32f2f,color:#fff style Quarantine3 fill:#d32f2f,color:#fff style Quarantine4 fill:#d32f2f,color:#fff style Quarantine5 fill:#d32f2f,color:#fff style ValidDF fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style QuarantineDF fill:#d32f2f,color:#fff style AddPartition fill:#7b1fa2,color:#fff style WriteValid fill:#7b1fa2,color:#fff style WriteQuar fill:#7b1fa2,color:#fff style SilverS3 fill:#388e3c,color:#fff style QuarantineS3 fill:#d32f2f,color:#fff style Metrics fill:#00897b,color:#fff style SuccessMarker fill:#757575,color:#fff style CloudWatch fill:#00897b,color:#fff style Complete fill:#2e7d32,color:#fff

Detailed Validation Flow

flowchart LR Input[Raw DataFrame] --> Enrich[Metadata Enrichment
row_hash, source_file_id
attempt_count, ingestion_timestamp] Enrich --> PreCheck0{TransactionID
in Silver Layer?} PreCheck0 -->|Yes| Condemn0[Auto-Condemn
DUPLICATE_TRANSACTION_ID] PreCheck0 -->|No| PreCheck1{Duplicate in
Quarantine History?} PreCheck1 -->|Yes| Condemn1[Auto-Condemn
DUPLICATE_FAILURE] PreCheck1 -->|No| PreCheck2{attempt_count
≥ 3?} PreCheck2 -->|Yes| Condemn2[Auto-Condemn
MAX_ATTEMPTS] PreCheck2 -->|No| V1[Step 1: Schema/Null Check] V1 -->|All required fields present?| V2[Step 2: Currency Validation] V1 -->|Missing fields| Q1[Quarantine: NULL_VALUE_ERROR
attempt_count++] V2 -->|Currency in allowlist?| V3[Step 3: Amount Type Check] V2 -->|Invalid currency| Q2[Quarantine: CURRENCY_ERROR
attempt_count++] V3 -->|Numeric amount?| V4[Step 4: Timestamp Parse] V3 -->|Non-numeric| Q3[Quarantine: TYPE_ERROR
attempt_count++] V4 -->|Valid timestamp?| V5[Step 5: Business Duplicate] V4 -->|Parse failed| Q4[Quarantine: TIMESTAMP_ERROR
attempt_count++] V5 -->|Unique account/date?| Valid[Valid DataFrame] V5 -->|Duplicate found| Q5[Quarantine: Duplicate account/date
attempt_count++] Q1 --> CircuitBreaker{Circuit Breaker
Check} Q2 --> CircuitBreaker Q3 --> CircuitBreaker Q4 --> CircuitBreaker Q5 --> CircuitBreaker CircuitBreaker -->|Threshold Exceeded| Halt[🚨 HALT PIPELINE] CircuitBreaker -->|Normal| Quarantine[Quarantine DataFrame] Condemn0 --> Condemned[Condemned DataFrame] Condemn1 --> Condemned Condemn2 --> Condemned Valid --> Output[Output: Valid + Quarantine + Condemned] Quarantine --> Output Condemned --> Output style Input fill:#f57c00,color:#fff style V1 fill:#fbc02d,color:#111 style V2 fill:#fbc02d,color:#111 style V3 fill:#fbc02d,color:#111 style V4 fill:#fbc02d,color:#111 style V5 fill:#fbc02d,color:#111 style Valid fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style Quarantine fill:#d32f2f,color:#fff style Q1 fill:#d32f2f,color:#fff style Q2 fill:#d32f2f,color:#fff style Q3 fill:#d32f2f,color:#fff style Q4 fill:#d32f2f,color:#fff style Q5 fill:#d32f2f,color:#fff style Output fill:#2e7d32,color:#fff

S3 Storage Structure

graph TD S3[S3 Data Lake] --> Bronze[Bronze Layer
Raw, Immutable] S3 --> Silver[Silver Layer
Validated, Partitioned] S3 --> Quarantine[Quarantine Layer
Invalid Rows] Bronze --> B1[transactions/
ingest_date=2026-01-21/
run_id=20260121T120000Z/
file.csv] Silver --> S1[schema_v=v1/
run_id=20260121T120000Z/
year=2026/
month=01/
part-00000.parquet] Silver --> S2[schema_v=v1/
run_id=20260121T120000Z/
year=2026/
month=02/
part-00000.parquet] Silver --> S3[_SUCCESS
Contains metrics JSON] Quarantine --> Q1[ingest_date=2026-01-21/
run_id=20260121T120000Z/
invalid_rows.parquet] Quarantine --> Q2[condemned/
ingest_date=2026-01-21/
run_id=20260121T120000Z/
condemned_rows.parquet] style S3 fill:#424242,color:#fff style Bronze fill:#f57c00,color:#fff style Silver fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style Quarantine fill:#d32f2f,color:#fff style B1 fill:#bdbdbd,color:#111 style S1 fill:#bdbdbd,color:#111 style S2 fill:#bdbdbd,color:#111 style S3 fill:#757575,color:#fff style Q1 fill:#bdbdbd,color:#111 style Q2 fill:#d32f2f,color:#fff

Component Interaction

sequenceDiagram participant Trigger as EventBridge/Step Functions participant ETL as ETL Script participant S3 as S3 Storage participant CloudWatch as CloudWatch participant Catalog as Glue Data Catalog Trigger->>ETL: Trigger ETL Job ETL->>S3: Read CSV from Bronze S3-->>ETL: Raw DataFrame ETL->>ETL: Validate & Transform Note over ETL: Split into Valid & Quarantine ETL->>S3: Write Valid Parquet (Partitioned)
Isolated run_id Path S3-->>ETL: Write Confirmation ETL->>S3: Write Quarantine Parquet S3-->>ETL: Write Confirmation ETL->>S3: Write _SUCCESS Marker S3-->>ETL: Write Confirmation ETL->>CloudWatch: Publish Metrics CloudWatch-->>ETL: Metrics Published Note over ETL,Trigger: Human Approval Required
Before Production Promotion
See HUMAN_VALIDATION_POLICY.md ETL->>Catalog: Update Table Partitions (Optional) Catalog-->>ETL: Partitions Updated ETL->>Trigger: Job Complete
(Awaiting Approval)

Error Handling & Resilience

flowchart TD Start([ETL Start]) --> TryRead[Try: Read from S3] TryRead -->|Success| Validate TryRead -->|Error| LogError1[Log Error] LogError1 --> Fail([Job Failed]) Validate --> TryWrite[Try: Write Valid Data] TryWrite -->|Success| TryQuar[Try: Write Quarantine] TryWrite -->|Error| LogError2[Log Error] LogError2 --> Fail TryQuar -->|Success| TryMetrics[Try: Write Metrics] TryQuar -->|Error| LogError3[Log Error] LogError3 --> Fail TryMetrics -->|Success| TryCloudWatch[Try: Publish CloudWatch] TryMetrics -->|Error| LogWarning1[Log Warning
Continue] TryCloudWatch -->|Success| Success TryCloudWatch -->|Error| LogWarning2[Log Warning
Continue] LogWarning2 --> Success style Start fill:#f57c00,color:#fff style TryRead fill:#424242,color:#fff style Validate fill:#7b1fa2,color:#fff style TryWrite fill:#7b1fa2,color:#fff style TryQuar fill:#7b1fa2,color:#fff style TryMetrics fill:#00897b,color:#fff style TryCloudWatch fill:#00897b,color:#fff style Success fill:#2e7d32,color:#fff style Fail fill:#d32f2f,color:#fff style LogError1 fill:#d32f2f,color:#fff style LogError2 fill:#d32f2f,color:#fff style LogError3 fill:#d32f2f,color:#fff style LogWarning1 fill:#ffa000,color:#111 style LogWarning2 fill:#ffa000,color:#111

Data Quality Metrics Flow

flowchart LR Input[Input Rows] --> Count[Count Total] Count --> Validate[Validation Process] Validate --> ValidCount[Count Valid Rows] Validate --> QuarantineCount[Count Quarantine Rows] ValidCount --> CalcRate[Calculate Quarantine Rate] QuarantineCount --> CalcRate CalcRate --> GroupErrors[Group by Error Type] GroupErrors --> Metrics[Metrics Object] Metrics --> SuccessFile[Write _SUCCESS File] Metrics --> CloudWatch[Publish to CloudWatch] Metrics --> Logs[Structured Logs] SuccessFile --> S3[(S3 Storage)] CloudWatch --> CW[(CloudWatch)] Logs --> LogsInsights[(CloudWatch Logs Insights)] style Input fill:#f57c00,color:#fff style Count fill:#7b1fa2,color:#fff style Validate fill:#7b1fa2,color:#fff style ValidCount fill:#388e3c,color:#fff style QuarantineCount fill:#d32f2f,color:#fff style CalcRate fill:#7b1fa2,color:#fff style GroupErrors fill:#7b1fa2,color:#fff style Metrics fill:#00897b,color:#fff style SuccessFile fill:#757575,color:#fff style CloudWatch fill:#00897b,color:#fff style Logs fill:#00897b,color:#fff style S3 fill:#424242,color:#fff style CW fill:#00897b,color:#fff style LogsInsights fill:#00897b,color:#fff