Task 1: ETL Pipeline - PseudocodeΒΆ
OverviewΒΆ
This document provides a high-level pseudocode representation of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.
ETL Pipeline FlowΒΆ
flowchart TB Start[Start ETL] --> Read[Read CSV from S3
Bronze Layer] Read --> Enrich[Enrich Metadata
row_hash, source_file_id
attempt_count, ingestion_timestamp] Enrich --> LoopPrevent[Loop Prevention
TransactionID Dedup
Quarantine History Check
Attempt Limit Check] LoopPrevent -->|Auto-Condemn| Condemned[Write Condemned
No Retries] LoopPrevent -->|Continue| Validate[Validate Rows
Schema, Null, Currency
Type, Timestamp] Validate --> CircuitBreaker{Circuit Breaker
Check} CircuitBreaker -->|Threshold Exceeded| Halt[π¨ HALT PIPELINE
Human Intervention] CircuitBreaker -->|Normal| Transform[Transform Data
Add Partition Keys] Transform --> Split{Split Valid/Invalid/Condemned} Split -->|Valid| WriteParquet[Write Parquet
Silver Layer
Partitioned] Split -->|Invalid| WriteQuarantine[Write Quarantine
With Errors & Metadata] Split -->|Condemned| WriteCondemned[Write Condemned
Separate Layer] WriteParquet --> Success[Write _SUCCESS
Marker with Metrics] WriteQuarantine --> Success WriteCondemned --> Success Condemned --> Success Success --> Approval[Human Approval Required
See HUMAN_VALIDATION_POLICY.md] Approval --> End[End] style Start fill:#f57c00,color:#fff style Read fill:#424242,color:#fff style Enrich fill:#7b1fa2,color:#fff style LoopPrevent fill:#fbc02d,color:#111 style Validate fill:#7b1fa2,color:#fff style CircuitBreaker fill:#ffa000,color:#111 style Halt fill:#d32f2f,color:#fff style Transform fill:#7b1fa2,color:#fff style Split fill:#fbc02d,color:#111 style WriteParquet fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style WriteQuarantine fill:#d32f2f,color:#fff style WriteCondemned fill:#d32f2f,color:#fff style Condemned fill:#d32f2f,color:#fff style Success fill:#757575,color:#fff style End fill:#2e7d32,color:#fff
Bronze Layer] Read --> Enrich[Enrich Metadata
row_hash, source_file_id
attempt_count, ingestion_timestamp] Enrich --> LoopPrevent[Loop Prevention
TransactionID Dedup
Quarantine History Check
Attempt Limit Check] LoopPrevent -->|Auto-Condemn| Condemned[Write Condemned
No Retries] LoopPrevent -->|Continue| Validate[Validate Rows
Schema, Null, Currency
Type, Timestamp] Validate --> CircuitBreaker{Circuit Breaker
Check} CircuitBreaker -->|Threshold Exceeded| Halt[π¨ HALT PIPELINE
Human Intervention] CircuitBreaker -->|Normal| Transform[Transform Data
Add Partition Keys] Transform --> Split{Split Valid/Invalid/Condemned} Split -->|Valid| WriteParquet[Write Parquet
Silver Layer
Partitioned] Split -->|Invalid| WriteQuarantine[Write Quarantine
With Errors & Metadata] Split -->|Condemned| WriteCondemned[Write Condemned
Separate Layer] WriteParquet --> Success[Write _SUCCESS
Marker with Metrics] WriteQuarantine --> Success WriteCondemned --> Success Condemned --> Success Success --> Approval[Human Approval Required
See HUMAN_VALIDATION_POLICY.md] Approval --> End[End] style Start fill:#f57c00,color:#fff style Read fill:#424242,color:#fff style Enrich fill:#7b1fa2,color:#fff style LoopPrevent fill:#fbc02d,color:#111 style Validate fill:#7b1fa2,color:#fff style CircuitBreaker fill:#ffa000,color:#111 style Halt fill:#d32f2f,color:#fff style Transform fill:#7b1fa2,color:#fff style Split fill:#fbc02d,color:#111 style WriteParquet fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style WriteQuarantine fill:#d32f2f,color:#fff style WriteCondemned fill:#d32f2f,color:#fff style Condemned fill:#d32f2f,color:#fff style Success fill:#757575,color:#fff style End fill:#2e7d32,color:#fff
Main ETL FlowΒΆ
FUNCTION main():
// Initialize
PARSE command-line arguments (input-bucket, input-key, output-bucket, etc.)
GENERATE run_id = current_timestamp (format: YYYYMMDDTHHMMSSZ)
GENERATE ingest_time = current_timestamp (ISO format)
INITIALIZE S3 client
LOG "Starting ETL run" with run_id
// Step 1: Read from Bronze Layer (Raw)
raw_dataframe = READ_CSV_FROM_S3(
bucket = input_bucket,
key = input_key
)
LOG "Read {row_count} rows from Bronze layer"
// Step 2: Initialize Circuit Breaker
circuit_breaker = INITIALIZE CircuitBreaker(threshold=100, window_hours=1)
// Step 3: Validate & Transform (with loop prevention)
valid_dataframe, quarantine_dataframe, condemned_dataframe = VALIDATE_AND_TRANSFORM(
dataframe = raw_dataframe,
run_id = run_id,
source_file = "s3://{bucket}/{key}",
ingest_time = ingest_time,
s3_client = s3_client,
quarantine_bucket = quarantine_bucket,
quarantine_prefix = quarantine_prefix,
circuit_breaker = circuit_breaker
)
// Step 4: Write Valid Data to Silver Layer
IF valid_dataframe is not empty:
WRITE_PARQUET_TO_S3(
dataframe = valid_dataframe,
bucket = output_bucket,
prefix = "processed/transactions/schema_v=v1/run_id={run_id}",
partition_columns = ['year', 'month']
)
// Step 5: Write Quarantine Data
IF quarantine_dataframe is not empty:
WRITE_PARQUET_TO_S3(
dataframe = quarantine_dataframe,
bucket = quarantine_bucket,
prefix = "quarantine/transactions/ingest_date={today}/run_id={run_id}",
partition_columns = None
)
LOG WARNING "Quarantined {count} invalid rows"
// Step 6: Write Condemned Data
IF condemned_dataframe is not empty:
WRITE_PARQUET_TO_S3(
dataframe = condemned_dataframe,
bucket = quarantine_bucket,
prefix = "quarantine/transactions/condemned/ingest_date={today}/run_id={run_id}",
partition_columns = None
)
LOG WARNING "Condemned {count} rows (max attempts or exact duplicates)"
// Step 7: Calculate Metrics
duration = current_time - start_time
all_processed = CONCATENATE(valid_dataframe, quarantine_dataframe, condemned_dataframe)
avg_attempt_count = AVERAGE(all_processed['attempt_count'])
auto_condemnation_rate = (COUNT(condemned_dataframe) / COUNT(raw_dataframe)) * 100
metrics = {
run_id: run_id,
input_rows: COUNT(raw_dataframe),
valid_rows: COUNT(valid_dataframe),
quarantined_rows: COUNT(quarantine_dataframe),
condemned_rows: COUNT(condemned_dataframe),
duration_seconds: duration,
avg_attempt_count: avg_attempt_count,
auto_condemnation_rate: auto_condemnation_rate,
quarantine_by_reason: GROUP_BY_ERROR_TYPE(quarantine_dataframe),
condemned_by_reason: GROUP_BY_ERROR_TYPE(condemned_dataframe)
}
// Step 6: Write Success Marker
WRITE_SUCCESS_MARKER(
bucket = output_bucket,
prefix = run_output_prefix,
metrics = metrics
)
// Step 7: Publish CloudWatch Metrics
PUBLISH_CLOUDWATCH_METRICS(metrics)
LOG "ETL run completed successfully" with metrics
END FUNCTION
```text
## Validation & Transformation Logic
```text
FUNCTION VALIDATE_AND_TRANSFORM(dataframe, run_id, source_file, ingest_time, s3_client, quarantine_bucket, quarantine_prefix, circuit_breaker):
// Initialize
VALIDATE required columns exist: ['TransactionID', 'CustomerID', 'TransactionAmount', 'Currency', 'TransactionTimestamp']
INITIALIZE validation_error column = NULL
INITIALIZE condemned column = FALSE
// Step 1: Metadata Enrichment
FOR each row in dataframe:
row_hash = COMPUTE_SHA256_HASH(row, all_columns)
source_file_id = EXTRACT_SOURCE_FILE_ID(source_file)
attempt_count = 0 // Will be loaded from quarantine history for retries
ingestion_timestamp = ingest_time
SET row['row_hash'] = row_hash
SET row['source_file_id'] = source_file_id
SET row['attempt_count'] = attempt_count
SET row['ingestion_timestamp'] = ingestion_timestamp
// Step 2: Pre-Validation Checks (Loop Prevention)
// 2a. TransactionID Deduplication from Silver Layer (if silver_bucket provided)
IF silver_bucket AND silver_prefix:
EXTRACT event_date from parsed_timestamp OR TransactionTimestamp
existing_transactions = CHECK_EXISTING_TRANSACTIONS(s3_client, silver_bucket, silver_prefix, event_date_range)
FOR each row in dataframe:
IF (row['TransactionID'], row['event_date']) IN existing_transactions:
SET validation_error = 'DUPLICATE_TRANSACTION_ID'
SET condemned = TRUE
LOG WARNING "Auto-condemned duplicate TransactionID: {TransactionID}"
// 2b. Duplicate Detection in Quarantine History
row_hashes = EXTRACT_ALL row_hash values from dataframe
quarantine_history = CHECK_QUARANTINE_HISTORY(s3_client, quarantine_bucket, quarantine_prefix, row_hashes)
FOR each row in dataframe WHERE NOT condemned:
IF row['row_hash'] IN quarantine_history:
SET validation_error = 'DUPLICATE_FAILURE'
SET condemned = TRUE
LOG WARNING "Auto-condemned duplicate row_hash: {row_hash}"
// 2c. Attempt Limit Check
FOR each row in dataframe WHERE NOT condemned:
IF row['attempt_count'] >= 3: // Would be loaded from quarantine history
SET validation_error = 'MAX_ATTEMPTS'
SET condemned = TRUE
LOG WARNING "Auto-condemned row exceeding max attempts"
// Step 3: Validation Rules (only for non-condemned rows)
valid_for_validation = FILTER rows WHERE NOT condemned
// 3a. Schema Validation (Null Checks)
FOR each row in valid_for_validation:
IF any required column is NULL:
SET validation_error = 'NULL_VALUE_ERROR'
INCREMENT attempt_count
// 3b. Currency Validation
FOR each row in valid_for_validation WHERE validation_error is NULL:
IF Currency NOT IN allowed_currencies ['EUR', 'USD', 'GBP', ...]:
SET validation_error = 'CURRENCY_ERROR'
INCREMENT attempt_count
// 3c. Amount Type Check
FOR each row in valid_for_validation WHERE validation_error is NULL:
IF TransactionAmount is not numeric:
SET validation_error = 'TYPE_ERROR'
INCREMENT attempt_count
ELSE:
CONVERT TransactionAmount to numeric type
// 3d. Timestamp Parsing
FOR each row in valid_for_validation WHERE validation_error is NULL:
parsed_timestamp = PARSE_TIMESTAMP(TransactionTimestamp)
IF parsed_timestamp is NULL:
SET validation_error = 'TIMESTAMP_ERROR'
INCREMENT attempt_count
// 3e. Business Logic Duplicate Detection (CustomerID + date)
FOR each row in valid_for_validation WHERE validation_error is NULL:
EXTRACT tx_date = DATE(parsed_timestamp)
IF (CustomerID, tx_date) combination appears multiple times:
APPEND 'Duplicate account/date combination' to validation_error
INCREMENT attempt_count
// Step 4: Circuit Breaker Check
error_counts_by_type = GROUP_BY validation_error, COUNT rows
FOR each error_type, count IN error_counts_by_type:
FOR i = 1 TO count:
circuit_breaker.RECORD_ERROR(error_type)
IF circuit_breaker.CHECK_THRESHOLD(error_type):
LOG ERROR "π¨ CIRCUIT BREAKER TRIGGERED: {error_type}"
RAISE RuntimeError "Circuit breaker triggered: {error_type} exceeded threshold"
// Step 5: Split into Valid, Quarantine, and Condemned
condemned_dataframe = FILTER rows WHERE condemned = TRUE
quarantine_dataframe = FILTER rows WHERE (validation_error is NOT NULL) AND (condemned = FALSE)
valid_dataframe = FILTER rows WHERE (validation_error is NULL) AND (condemned = FALSE)
// Add Partition Columns to Valid Data
FOR each row in valid_dataframe:
year = YEAR(parsed_timestamp)
month = ZERO_PAD(MONTH(parsed_timestamp), 2)
REMOVE parsed_timestamp, validation_error, condemned columns
// Add Metadata to Quarantine
FOR each row in quarantine_dataframe:
ingest_date = TODAY()
run_id = run_id
retry_history = CREATE_JSON_ARRAY([{
run_id: run_id,
attempt_count: row['attempt_count'],
error: row['validation_error'],
timestamp: CURRENT_TIMESTAMP
}])
// Add Metadata to Condemned
FOR each row in condemned_dataframe:
ingest_date = TODAY()
run_id = run_id
retry_history = CREATE_JSON_ARRAY([{
run_id: run_id,
attempt_count: row['attempt_count'],
error: row['validation_error'],
timestamp: CURRENT_TIMESTAMP,
condemned: TRUE
}])
RETURN valid_dataframe, quarantine_dataframe, condemned_dataframe
END FUNCTION
```text
## S3 Operations
```text
FUNCTION READ_CSV_FROM_S3(s3_client, bucket, key):
TRY:
response = s3_client.GET_OBJECT(bucket, key)
content = DECODE(response['Body'], encoding='utf-8')
dataframe = PANDAS.READ_CSV(content)
RETURN dataframe
CATCH error:
LOG ERROR "Failed to read from S3: {error}"
RAISE error
END FUNCTION
FUNCTION WRITE_PARQUET_TO_S3(dataframe, s3_client, bucket, prefix, partition_cols):
IF dataframe is empty:
LOG WARNING "No data to write"
RETURN
TRY:
CONVERT dataframe to PyArrow Table
CONFIGURE S3 filesystem connection
WRITE partitioned Parquet files to s3://{bucket}/{prefix}/
LOG INFO "Successfully wrote {row_count} rows"
CATCH error:
LOG ERROR "Failed to write Parquet: {error}"
RAISE error
END FUNCTION
FUNCTION WRITE_SUCCESS_MARKER(s3_client, bucket, prefix, run_id, metrics):
key = "{prefix}/_SUCCESS"
content = JSON_SERIALIZE(metrics)
s3_client.PUT_OBJECT(bucket, key, content)
LOG INFO "Wrote success marker"
END FUNCTION
```text
## Monitoring & Metrics
```text
FUNCTION PUBLISH_CLOUDWATCH_METRICS(metrics):
IF CloudWatch is disabled (local testing):
RETURN
TRY:
cloudwatch_client = INITIALIZE CloudWatch client
quarantine_rate = (quarantined_rows / input_rows) * 100
METRICS = [
{ name: 'InputRows', value: metrics.input_rows, unit: 'Count' },
{ name: 'ValidRows', value: metrics.valid_rows, unit: 'Count' },
{ name: 'QuarantinedRows', value: metrics.quarantined_rows, unit: 'Count' },
{ name: 'CondemnedRows', value: metrics.condemned_rows, unit: 'Count' },
{ name: 'QuarantineRate', value: quarantine_rate, unit: 'Percent' },
{ name: 'AvgAttemptCount', value: metrics.avg_attempt_count, unit: 'Count' },
{ name: 'AutoCondemnationRate', value: metrics.auto_condemnation_rate, unit: 'Percent' },
{ name: 'DurationSeconds', value: metrics.duration_seconds, unit: 'Seconds' }
]
cloudwatch_client.PUT_METRIC_DATA(
namespace = 'Ohpen/ETL',
metric_data = METRICS
)
LOG INFO "Published CloudWatch metrics"
CATCH error:
LOG WARNING "Failed to publish CloudWatch metrics: {error}"
// Don't fail the job if metrics fail
END FUNCTION
Key Design DecisionsΒΆ
- Run Isolation: Each ETL run writes to a unique
run_idpath to prevent data corruption during retries - Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
- Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
- Partitioning: Valid data partitioned by
yearandmonthfor query performance - Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
- Condemned Layer: Rows exceeding max attempts or exact duplicates are moved to condemned layer (no automatic retries)
- Idempotency: Run isolation via
run_idensures safe reruns without overwriting previous outputs - Metadata: All runs include
_SUCCESSmarker with metrics for monitoring and lineage
Data Flow SummaryΒΆ
Bronze Layer (Raw CSV)
β
[Read CSV from S3]
β
[Enrich Metadata: row_hash, source_file_id, attempt_count, ingestion_timestamp]
β
[Loop Prevention: Duplicate Check, Attempt Limit Check]
βββ Auto-Condemned β Condemned Layer (No Retries)
βββ Continue β [Validate & Transform]
β
[Circuit Breaker Check]
βββ Threshold Exceeded β π¨ HALT PIPELINE
βββ Normal β [Split Valid/Invalid/Condemned]
βββ Valid Rows β Silver Layer (Partitioned Parquet)
βββ Invalid Rows β Quarantine (Parquet with error details)
βββ Condemned Rows β Condemned Layer
β
[Write Success Marker]
β
[Publish CloudWatch Metrics]
β
Complete
See AlsoΒΆ
- Full implementation:
APPENDIX_A_FULL_ETL_CODE.md - Assumptions & Edge Cases:
ASSUMPTIONS_AND_EDGE_CASES.md - Architecture:
../02_data_lake_architecture_design/architecture.md