Task 1: ETL Pipeline - PseudocodeΒΆ

OverviewΒΆ

This document provides a high-level pseudocode representation of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.

ETL Pipeline FlowΒΆ

flowchart TB Start[Start ETL] --> Read[Read CSV from S3
Bronze Layer] Read --> Enrich[Enrich Metadata
row_hash, source_file_id
attempt_count, ingestion_timestamp] Enrich --> LoopPrevent[Loop Prevention
TransactionID Dedup
Quarantine History Check
Attempt Limit Check] LoopPrevent -->|Auto-Condemn| Condemned[Write Condemned
No Retries] LoopPrevent -->|Continue| Validate[Validate Rows
Schema, Null, Currency
Type, Timestamp] Validate --> CircuitBreaker{Circuit Breaker
Check} CircuitBreaker -->|Threshold Exceeded| Halt[🚨 HALT PIPELINE
Human Intervention] CircuitBreaker -->|Normal| Transform[Transform Data
Add Partition Keys] Transform --> Split{Split Valid/Invalid/Condemned} Split -->|Valid| WriteParquet[Write Parquet
Silver Layer
Partitioned] Split -->|Invalid| WriteQuarantine[Write Quarantine
With Errors & Metadata] Split -->|Condemned| WriteCondemned[Write Condemned
Separate Layer] WriteParquet --> Success[Write _SUCCESS
Marker with Metrics] WriteQuarantine --> Success WriteCondemned --> Success Condemned --> Success Success --> Approval[Human Approval Required
See HUMAN_VALIDATION_POLICY.md] Approval --> End[End] style Start fill:#f57c00,color:#fff style Read fill:#424242,color:#fff style Enrich fill:#7b1fa2,color:#fff style LoopPrevent fill:#fbc02d,color:#111 style Validate fill:#7b1fa2,color:#fff style CircuitBreaker fill:#ffa000,color:#111 style Halt fill:#d32f2f,color:#fff style Transform fill:#7b1fa2,color:#fff style Split fill:#fbc02d,color:#111 style WriteParquet fill:#388e3c,stroke:#7b1fa2,stroke-width:2px,color:#fff style WriteQuarantine fill:#d32f2f,color:#fff style WriteCondemned fill:#d32f2f,color:#fff style Condemned fill:#d32f2f,color:#fff style Success fill:#757575,color:#fff style End fill:#2e7d32,color:#fff

Main ETL FlowΒΆ


FUNCTION main():
    // Initialize
    PARSE command-line arguments (input-bucket, input-key, output-bucket, etc.)
    GENERATE run_id = current_timestamp (format: YYYYMMDDTHHMMSSZ)
    GENERATE ingest_time = current_timestamp (ISO format)
    INITIALIZE S3 client

    LOG "Starting ETL run" with run_id

    // Step 1: Read from Bronze Layer (Raw)
    raw_dataframe = READ_CSV_FROM_S3(
        bucket = input_bucket,
        key = input_key
    )
    LOG "Read {row_count} rows from Bronze layer"

    // Step 2: Initialize Circuit Breaker
    circuit_breaker = INITIALIZE CircuitBreaker(threshold=100, window_hours=1)

    // Step 3: Validate & Transform (with loop prevention)
    valid_dataframe, quarantine_dataframe, condemned_dataframe = VALIDATE_AND_TRANSFORM(
        dataframe = raw_dataframe,
        run_id = run_id,
        source_file = "s3://{bucket}/{key}",
        ingest_time = ingest_time,
        s3_client = s3_client,
        quarantine_bucket = quarantine_bucket,
        quarantine_prefix = quarantine_prefix,
        circuit_breaker = circuit_breaker
    )

    // Step 4: Write Valid Data to Silver Layer
    IF valid_dataframe is not empty:
        WRITE_PARQUET_TO_S3(
            dataframe = valid_dataframe,
            bucket = output_bucket,
            prefix = "processed/transactions/schema_v=v1/run_id={run_id}",
            partition_columns = ['year', 'month']
        )

    // Step 5: Write Quarantine Data
    IF quarantine_dataframe is not empty:
        WRITE_PARQUET_TO_S3(
            dataframe = quarantine_dataframe,
            bucket = quarantine_bucket,
            prefix = "quarantine/transactions/ingest_date={today}/run_id={run_id}",
            partition_columns = None
        )
        LOG WARNING "Quarantined {count} invalid rows"

    // Step 6: Write Condemned Data
    IF condemned_dataframe is not empty:
        WRITE_PARQUET_TO_S3(
            dataframe = condemned_dataframe,
            bucket = quarantine_bucket,
            prefix = "quarantine/transactions/condemned/ingest_date={today}/run_id={run_id}",
            partition_columns = None
        )
        LOG WARNING "Condemned {count} rows (max attempts or exact duplicates)"

    // Step 7: Calculate Metrics
    duration = current_time - start_time
    all_processed = CONCATENATE(valid_dataframe, quarantine_dataframe, condemned_dataframe)
    avg_attempt_count = AVERAGE(all_processed['attempt_count'])
    auto_condemnation_rate = (COUNT(condemned_dataframe) / COUNT(raw_dataframe)) * 100

    metrics = {
        run_id: run_id,
        input_rows: COUNT(raw_dataframe),
        valid_rows: COUNT(valid_dataframe),
        quarantined_rows: COUNT(quarantine_dataframe),
        condemned_rows: COUNT(condemned_dataframe),
        duration_seconds: duration,
        avg_attempt_count: avg_attempt_count,
        auto_condemnation_rate: auto_condemnation_rate,
        quarantine_by_reason: GROUP_BY_ERROR_TYPE(quarantine_dataframe),
        condemned_by_reason: GROUP_BY_ERROR_TYPE(condemned_dataframe)
    }

    // Step 6: Write Success Marker
    WRITE_SUCCESS_MARKER(
        bucket = output_bucket,
        prefix = run_output_prefix,
        metrics = metrics
    )

    // Step 7: Publish CloudWatch Metrics
    PUBLISH_CLOUDWATCH_METRICS(metrics)

    LOG "ETL run completed successfully" with metrics

END FUNCTION

```text

## Validation & Transformation Logic

```text

FUNCTION VALIDATE_AND_TRANSFORM(dataframe, run_id, source_file, ingest_time, s3_client, quarantine_bucket, quarantine_prefix, circuit_breaker):
    // Initialize
    VALIDATE required columns exist: ['TransactionID', 'CustomerID', 'TransactionAmount', 'Currency', 'TransactionTimestamp']
    INITIALIZE validation_error column = NULL
    INITIALIZE condemned column = FALSE

    // Step 1: Metadata Enrichment
    FOR each row in dataframe:
        row_hash = COMPUTE_SHA256_HASH(row, all_columns)
        source_file_id = EXTRACT_SOURCE_FILE_ID(source_file)
        attempt_count = 0  // Will be loaded from quarantine history for retries
        ingestion_timestamp = ingest_time
        SET row['row_hash'] = row_hash
        SET row['source_file_id'] = source_file_id
        SET row['attempt_count'] = attempt_count
        SET row['ingestion_timestamp'] = ingestion_timestamp

    // Step 2: Pre-Validation Checks (Loop Prevention)

    // 2a. TransactionID Deduplication from Silver Layer (if silver_bucket provided)
    IF silver_bucket AND silver_prefix:
        EXTRACT event_date from parsed_timestamp OR TransactionTimestamp
        existing_transactions = CHECK_EXISTING_TRANSACTIONS(s3_client, silver_bucket, silver_prefix, event_date_range)

        FOR each row in dataframe:
            IF (row['TransactionID'], row['event_date']) IN existing_transactions:
                SET validation_error = 'DUPLICATE_TRANSACTION_ID'
                SET condemned = TRUE
                LOG WARNING "Auto-condemned duplicate TransactionID: {TransactionID}"

    // 2b. Duplicate Detection in Quarantine History
    row_hashes = EXTRACT_ALL row_hash values from dataframe
    quarantine_history = CHECK_QUARANTINE_HISTORY(s3_client, quarantine_bucket, quarantine_prefix, row_hashes)

    FOR each row in dataframe WHERE NOT condemned:
        IF row['row_hash'] IN quarantine_history:
            SET validation_error = 'DUPLICATE_FAILURE'
            SET condemned = TRUE
            LOG WARNING "Auto-condemned duplicate row_hash: {row_hash}"

    // 2c. Attempt Limit Check
    FOR each row in dataframe WHERE NOT condemned:
        IF row['attempt_count'] >= 3:  // Would be loaded from quarantine history
            SET validation_error = 'MAX_ATTEMPTS'
            SET condemned = TRUE
            LOG WARNING "Auto-condemned row exceeding max attempts"

    // Step 3: Validation Rules (only for non-condemned rows)
    valid_for_validation = FILTER rows WHERE NOT condemned

    // 3a. Schema Validation (Null Checks)
    FOR each row in valid_for_validation:
        IF any required column is NULL:
            SET validation_error = 'NULL_VALUE_ERROR'
            INCREMENT attempt_count

    // 3b. Currency Validation
    FOR each row in valid_for_validation WHERE validation_error is NULL:
        IF Currency NOT IN allowed_currencies ['EUR', 'USD', 'GBP', ...]:
            SET validation_error = 'CURRENCY_ERROR'
            INCREMENT attempt_count

    // 3c. Amount Type Check
    FOR each row in valid_for_validation WHERE validation_error is NULL:
        IF TransactionAmount is not numeric:
            SET validation_error = 'TYPE_ERROR'
            INCREMENT attempt_count
        ELSE:
            CONVERT TransactionAmount to numeric type

    // 3d. Timestamp Parsing
    FOR each row in valid_for_validation WHERE validation_error is NULL:
        parsed_timestamp = PARSE_TIMESTAMP(TransactionTimestamp)
        IF parsed_timestamp is NULL:
            SET validation_error = 'TIMESTAMP_ERROR'
            INCREMENT attempt_count

    // 3e. Business Logic Duplicate Detection (CustomerID + date)
    FOR each row in valid_for_validation WHERE validation_error is NULL:
        EXTRACT tx_date = DATE(parsed_timestamp)
        IF (CustomerID, tx_date) combination appears multiple times:
            APPEND 'Duplicate account/date combination' to validation_error
            INCREMENT attempt_count

    // Step 4: Circuit Breaker Check
    error_counts_by_type = GROUP_BY validation_error, COUNT rows
    FOR each error_type, count IN error_counts_by_type:
        FOR i = 1 TO count:
            circuit_breaker.RECORD_ERROR(error_type)

        IF circuit_breaker.CHECK_THRESHOLD(error_type):
            LOG ERROR "🚨 CIRCUIT BREAKER TRIGGERED: {error_type}"
            RAISE RuntimeError "Circuit breaker triggered: {error_type} exceeded threshold"

    // Step 5: Split into Valid, Quarantine, and Condemned
    condemned_dataframe = FILTER rows WHERE condemned = TRUE
    quarantine_dataframe = FILTER rows WHERE (validation_error is NOT NULL) AND (condemned = FALSE)
    valid_dataframe = FILTER rows WHERE (validation_error is NULL) AND (condemned = FALSE)

    // Add Partition Columns to Valid Data
    FOR each row in valid_dataframe:
        year = YEAR(parsed_timestamp)
        month = ZERO_PAD(MONTH(parsed_timestamp), 2)
        REMOVE parsed_timestamp, validation_error, condemned columns

    // Add Metadata to Quarantine
    FOR each row in quarantine_dataframe:
        ingest_date = TODAY()
        run_id = run_id
        retry_history = CREATE_JSON_ARRAY([{
            run_id: run_id,
            attempt_count: row['attempt_count'],
            error: row['validation_error'],
            timestamp: CURRENT_TIMESTAMP
        }])

    // Add Metadata to Condemned
    FOR each row in condemned_dataframe:
        ingest_date = TODAY()
        run_id = run_id
        retry_history = CREATE_JSON_ARRAY([{
            run_id: run_id,
            attempt_count: row['attempt_count'],
            error: row['validation_error'],
            timestamp: CURRENT_TIMESTAMP,
            condemned: TRUE
        }])

    RETURN valid_dataframe, quarantine_dataframe, condemned_dataframe

END FUNCTION

```text

## S3 Operations

```text

FUNCTION READ_CSV_FROM_S3(s3_client, bucket, key):
    TRY:
        response = s3_client.GET_OBJECT(bucket, key)
        content = DECODE(response['Body'], encoding='utf-8')
        dataframe = PANDAS.READ_CSV(content)
        RETURN dataframe
    CATCH error:
        LOG ERROR "Failed to read from S3: {error}"
        RAISE error
END FUNCTION

FUNCTION WRITE_PARQUET_TO_S3(dataframe, s3_client, bucket, prefix, partition_cols):
    IF dataframe is empty:
        LOG WARNING "No data to write"
        RETURN

    TRY:
        CONVERT dataframe to PyArrow Table
        CONFIGURE S3 filesystem connection
        WRITE partitioned Parquet files to s3://{bucket}/{prefix}/
        LOG INFO "Successfully wrote {row_count} rows"
    CATCH error:
        LOG ERROR "Failed to write Parquet: {error}"
        RAISE error
END FUNCTION

FUNCTION WRITE_SUCCESS_MARKER(s3_client, bucket, prefix, run_id, metrics):
    key = "{prefix}/_SUCCESS"
    content = JSON_SERIALIZE(metrics)
    s3_client.PUT_OBJECT(bucket, key, content)
    LOG INFO "Wrote success marker"
END FUNCTION

```text

## Monitoring & Metrics

```text

FUNCTION PUBLISH_CLOUDWATCH_METRICS(metrics):
    IF CloudWatch is disabled (local testing):
        RETURN

    TRY:
        cloudwatch_client = INITIALIZE CloudWatch client
        quarantine_rate = (quarantined_rows / input_rows) * 100

        METRICS = [
            { name: 'InputRows', value: metrics.input_rows, unit: 'Count' },
            { name: 'ValidRows', value: metrics.valid_rows, unit: 'Count' },
            { name: 'QuarantinedRows', value: metrics.quarantined_rows, unit: 'Count' },
            { name: 'CondemnedRows', value: metrics.condemned_rows, unit: 'Count' },
            { name: 'QuarantineRate', value: quarantine_rate, unit: 'Percent' },
            { name: 'AvgAttemptCount', value: metrics.avg_attempt_count, unit: 'Count' },
            { name: 'AutoCondemnationRate', value: metrics.auto_condemnation_rate, unit: 'Percent' },
            { name: 'DurationSeconds', value: metrics.duration_seconds, unit: 'Seconds' }
        ]

        cloudwatch_client.PUT_METRIC_DATA(
            namespace = 'Ohpen/ETL',
            metric_data = METRICS
        )

        LOG INFO "Published CloudWatch metrics"
    CATCH error:
        LOG WARNING "Failed to publish CloudWatch metrics: {error}"
        // Don't fail the job if metrics fail
END FUNCTION

Key Design DecisionsΒΆ

  1. Run Isolation: Each ETL run writes to a unique run_id path to prevent data corruption during retries
  2. Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
  3. Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
  4. Partitioning: Valid data partitioned by year and month for query performance
  5. Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
  6. Condemned Layer: Rows exceeding max attempts or exact duplicates are moved to condemned layer (no automatic retries)
  7. Idempotency: Run isolation via run_id ensures safe reruns without overwriting previous outputs
  8. Metadata: All runs include _SUCCESS marker with metrics for monitoring and lineage

Data Flow SummaryΒΆ

Bronze Layer (Raw CSV)
    ↓
[Read CSV from S3]
    ↓
[Enrich Metadata: row_hash, source_file_id, attempt_count, ingestion_timestamp]
    ↓
[Loop Prevention: Duplicate Check, Attempt Limit Check]
    β”œβ”€β†’ Auto-Condemned β†’ Condemned Layer (No Retries)
    └─→ Continue β†’ [Validate & Transform]
                    ↓
                    [Circuit Breaker Check]
                    β”œβ”€β†’ Threshold Exceeded β†’ 🚨 HALT PIPELINE
                    └─→ Normal β†’ [Split Valid/Invalid/Condemned]
                                  β”œβ”€β†’ Valid Rows β†’ Silver Layer (Partitioned Parquet)
                                  β”œβ”€β†’ Invalid Rows β†’ Quarantine (Parquet with error details)
                                  └─→ Condemned Rows β†’ Condemned Layer
    ↓
[Write Success Marker]
    ↓
[Publish CloudWatch Metrics]
    ↓
Complete

See AlsoΒΆ

  • Full implementation: APPENDIX_A_FULL_ETL_CODE.md
  • Assumptions & Edge Cases: ASSUMPTIONS_AND_EDGE_CASES.md
  • Architecture: ../02_data_lake_architecture_design/architecture.md