Appendix A: Full ETL Implementation Code

This appendix contains the complete implementation of the transaction ingestion ETL pipeline.

File: src/etl/ingest_transactions.py

Note: The implementation has been enhanced with:

  • Metadata enrichment (row_hash, source_file_id, attempt_count, ingestion_timestamp)
  • Loop prevention (duplicate detection in quarantine history, attempt limits, circuit breaker)
  • Condemned layer for rows exceeding max attempts or exact duplicates
  • Updated error types (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR, DUPLICATE_FAILURE, MAX_ATTEMPTS)
  • Enhanced CloudWatch metrics (loop prevention metrics, condemned rows)

For the most up-to-date code, see src/etl/ingest_transactions.py.


import argparse
import datetime
import hashlib
import json
import logging
import os
import sys
from collections import defaultdict
from io import StringIO

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Configure logging with structured JSON support for CloudWatch Logs Insights

class StructuredFormatter(logging.Formatter):
    """Formatter that outputs structured JSON logs for CloudWatch Logs Insights"""
    def format(self, record):
        log_data = {
            'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
        }
        # Add any extra fields passed via extra= parameter
        if hasattr(record, 'run_id'):
            log_data['run_id'] = record.run_id
        if hasattr(record, 'metric_name'):
            log_data['metric_name'] = record.metric_name
        if hasattr(record, 'metric_value'):
            log_data['metric_value'] = record.metric_value
        return json.dumps(log_data)

# Configure logging

log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(StructuredFormatter())
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)

# ISO-4217 Currency Allowlist (Subset for case study)

ALLOWED_CURRENCIES = {
    'EUR', 'USD', 'GBP', 'JPY', 'AUD', 'CAD', 'CHF', 'CNY', 'HKD', 'NZD'
}

REQUIRED_COLUMNS = ['TransactionID', 'CustomerID', 'TransactionAmount', 'Currency', 'TransactionTimestamp']

# Error type constants matching onepager
ERROR_SCHEMA = 'SCHEMA_ERROR'
ERROR_NULL_VALUE = 'NULL_VALUE_ERROR'
ERROR_TYPE = 'TYPE_ERROR'
ERROR_CURRENCY = 'CURRENCY_ERROR'
ERROR_TIMESTAMP = 'TIMESTAMP_ERROR'
ERROR_DUPLICATE_FAILURE = 'DUPLICATE_FAILURE'
ERROR_MAX_ATTEMPTS = 'MAX_ATTEMPTS'

# Loop prevention constants
MAX_ATTEMPTS = 3
CIRCUIT_BREAKER_THRESHOLD = 100  # Same errors per hour
CIRCUIT_BREAKER_WINDOW_HOURS = 1

def get_s3_client(endpoint_url=None):
    """
    Returns a boto3 S3 client.
    If endpoint_url is provided (e.g., for local testing), it uses that.
    """
    return boto3.client(
        's3',
        endpoint_url=endpoint_url,
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
    )

def read_csv_from_s3(s3_client, bucket, key):
    """Reads CSV from S3 into a pandas DataFrame."""
    try:
        logger.info(f"Reading from s3://{bucket}/{key}")
        response = s3_client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')
        return pd.read_csv(StringIO(content))
    except Exception as e:
        logger.error(f"Failed to read from S3: {e}")
        raise

def compute_row_hash(row, columns):
    """
    Compute SHA256 hash of row content for duplicate detection.

    Args:
        row: pandas Series representing a single row
        columns: list of column names to include in hash

    Returns:
        str: SHA256 hash as hexadecimal string
    """
    row_str = '|'.join([str(row[col]) if pd.notna(row[col]) else '' for col in columns])
    return hashlib.sha256(row_str.encode('utf-8')).hexdigest()

def extract_source_file_id(source_file):
    """
    Extract source_file_id from S3 path or generate from source_file.

    Args:
        source_file: Full S3 path (e.g., s3://bucket/path/file.csv)

    Returns:
        str: Source file identifier
    """
    if source_file and source_file.startswith('s3://'):
        parts = source_file.split('/')
        if len(parts) > 0:
            filename = parts[-1]
            return filename.rsplit('.', 1)[0] if '.' in filename else filename
    return source_file or 'unknown'

def enrich_metadata(df, source_file=None, ingest_time=None):
    """
    Enrich dataframe with tracking metadata: row_hash, source_file_id, attempt_count, ingestion_timestamp.

    Args:
        df: pandas DataFrame
        source_file: Source file path (optional)
        ingest_time: Ingestion timestamp (optional, defaults to now)

    Returns:
        pandas DataFrame with enriched metadata columns
    """
    df = df.copy()
    df['row_hash'] = df.apply(lambda row: compute_row_hash(row, df.columns.tolist()), axis=1)
    df['source_file_id'] = extract_source_file_id(source_file) if source_file else 'unknown'
    if ingest_time:
        df['ingestion_timestamp'] = ingest_time
    else:
        df['ingestion_timestamp'] = datetime.datetime.now(datetime.timezone.utc).isoformat()
    df['attempt_count'] = 0
    df['retry_history'] = None
    return df

def check_quarantine_history(s3_client, quarantine_bucket, quarantine_prefix, row_hashes):
    """
    Check quarantine history for duplicate row_hashes.
    For case study: simplified implementation (returns empty dict).
    In production, this would query Glue Catalog or use Athena for efficient lookup.
    """
    logger.debug(f"Checking quarantine history for {len(row_hashes)} row hashes")
    return {}

def check_attempt_limit(attempt_count):
    """Check if attempt_count exceeds maximum allowed attempts."""
    return attempt_count >= MAX_ATTEMPTS

class CircuitBreaker:
    """Circuit breaker to halt pipeline when error threshold exceeded."""

    def __init__(self, threshold=CIRCUIT_BREAKER_THRESHOLD, window_hours=CIRCUIT_BREAKER_WINDOW_HOURS):
        self.threshold = threshold
        self.window_hours = window_hours
        self.error_counts = defaultdict(list)

    def record_error(self, error_type, timestamp=None):
        """Record an error occurrence."""
        if timestamp is None:
            timestamp = datetime.datetime.now(datetime.timezone.utc)
        self.error_counts[error_type].append(timestamp)
        cutoff = timestamp - datetime.timedelta(hours=self.window_hours)
        self.error_counts[error_type] = [
            ts for ts in self.error_counts[error_type] if ts > cutoff
        ]

    def check_threshold(self, error_type):
        """Check if error threshold exceeded for given error type."""
        return len(self.error_counts[error_type]) >= self.threshold

    def get_error_count(self, error_type):
        """Get current error count for given error type."""
        return len(self.error_counts[error_type])

def validate_and_transform(df, run_id, source_file=None, ingest_time=None, 
                          s3_client=None, quarantine_bucket=None, quarantine_prefix=None,
                          circuit_breaker=None):
    """
    Validates the dataframe and splits it into valid and quarantine dataframes.
    Writes validated data to Silver layer (Parquet), quarantines invalid rows.
    Adds partition columns (year, month) to valid data for Silver layer.
    """
    # Check missing columns
    missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
    if missing_cols:
        raise ValueError(f"Input missing required columns: {missing_cols}")

    # Initialize validation status
    df['validation_error'] = None

    # Optional metadata columns (passed in from the ingestion context)
    # Keep these stable in tests by only adding them when explicitly provided.
    if ingest_time is not None:
        df['ingest_time'] = ingest_time
    if source_file is not None:
        df['source_file'] = source_file

    # 1. Null checks
    null_mask = df[REQUIRED_COLUMNS].isnull().any(axis=1)
    df.loc[null_mask, 'validation_error'] = 'Missing required fields'

    # 2. Currency validation
    # Only check rows that passed null checks to avoid overwriting
    currency_mask = (~null_mask) & (~df['Currency'].isin(ALLOWED_CURRENCIES))
    df.loc[currency_mask, 'validation_error'] = 'Invalid Currency'

    # 3. Type checks (minimal, case-study safe)
    # Ensure TransactionAmount is numeric. Negative values are allowed (e.g., refunds/withdrawals);
    # non-numeric values are quarantined.
    amount_ok_mask = (~null_mask) & (df['validation_error'].isnull())
    if amount_ok_mask.any():
        amount_numeric_all = pd.to_numeric(df['TransactionAmount'], errors='coerce')
        invalid_amount_mask = amount_ok_mask & (amount_numeric_all.isnull())
        df.loc[invalid_amount_mask, 'validation_error'] = 'Invalid Amount'
        # Normalize the column type for rows that passed the numeric check
        ok_numeric_mask = amount_ok_mask & (~invalid_amount_mask)
        df.loc[ok_numeric_mask, 'TransactionAmount'] = amount_numeric_all.loc[ok_numeric_mask]

    # 4. Timestamp parsing
    # We attempt to parse timestamps. Failures get quarantined.
    # We use a temporary column for parsed datetime to avoid breaking the original for quarantine
    df['parsed_timestamp'] = pd.to_datetime(df['TransactionTimestamp'], errors='coerce', utc=True)

    time_mask = (~null_mask) & (df['parsed_timestamp'].isnull())
    # If it was already invalid, append error; otherwise set it
    df.loc[time_mask, 'validation_error'] = df.loc[time_mask, 'validation_error'].apply(
        lambda x: f"{x}; Invalid Timestamp" if x else "Invalid Timestamp"
    )

    # 5. Duplicate detection (account/date combinations)
    # Flag duplicates but don't drop - preserve for audit
    # Note: This assumes CustomerID + TransactionTimestamp (date part) as the business key
    # For production, this would be configurable based on business rules
    valid_for_dup_check = df[df['validation_error'].isnull()].copy()
    if not valid_for_dup_check.empty and 'CustomerID' in valid_for_dup_check.columns:
        valid_for_dup_check['tx_date'] = valid_for_dup_check['parsed_timestamp'].dt.date
        duplicate_mask = valid_for_dup_check.duplicated(
            subset=['CustomerID', 'tx_date'],
            keep=False
        )
        if duplicate_mask.any():
            # Mark duplicates in original dataframe
            dup_indices = valid_for_dup_check[duplicate_mask].index
            for idx in dup_indices:
                if df.loc[idx, 'validation_error'] is None:
                    df.loc[idx, 'validation_error'] = 'Duplicate account/date combination'
                else:
                    df.loc[idx, 'validation_error'] = f"{df.loc[idx, 'validation_error']}; Duplicate account/date combination"

    # Split into valid and quarantine
    quarantine_df = df[df['validation_error'].notnull()].copy()
    valid_df = df[df['validation_error'].isnull()].copy()

    # Add partition columns to valid data (Silver layer partitioning by event time)
    if not valid_df.empty:
        valid_df['year'] = valid_df['parsed_timestamp'].dt.year
        valid_df['month'] = valid_df['parsed_timestamp'].dt.month.astype(str).str.zfill(2)
        # Drop helper column, keep original timestamp
        del valid_df['parsed_timestamp']
        del valid_df['validation_error']

    # Add metadata to quarantine
    quarantine_df['ingest_date'] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d')
    quarantine_df['run_id'] = run_id

    return valid_df, quarantine_df

def write_parquet_to_s3(df, s3_client, bucket, prefix, partition_cols=None):
    """
    Writes DataFrame to S3 as Parquet.
    If partition_cols is provided, writes partitioned dataset using pyarrow.
    """
    if df.empty:
        logger.warning(f"No data to write to s3://{bucket}/{prefix}")
        return

    table = pa.Table.from_pandas(df)

    # Buffer for writing
    # Note: For strict S3 partitioning with PyArrow fs, we need s3fs.
    # Use s3fs for partitioned write support which is cleaner than manual boto3 loops

    import s3fs

    endpoint = os.getenv('S3_ENDPOINT_URL')
    key = os.getenv('AWS_ACCESS_KEY_ID')
    secret = os.getenv('AWS_SECRET_ACCESS_KEY')

    # Configure s3fs for AWS S3
    s3_fs = s3fs.S3FileSystem(
        key=key,
        secret=secret,
        client_kwargs={'endpoint_url': endpoint} if endpoint else {},
        use_ssl=False if endpoint and endpoint.startswith('http://') else True
    )

    path = f"{bucket}/{prefix}"

    try:
        pq.write_to_dataset(
            table,
            root_path=path,
            partition_cols=partition_cols,
            filesystem=s3_fs,
            existing_data_behavior='overwrite_or_ignore' # Safe because run_id makes path unique
        )
        logger.info(f"Successfully wrote {len(df)} rows to s3://{path}")
    except Exception as e:
        logger.error(f"Failed to write Parquet: {e}")
        raise

def write_condemned_to_s3(df, s3_client, bucket, prefix):
    """
    Writes condemned rows to S3 in condemned layer.

    Args:
        df: pandas DataFrame with condemned rows
        s3_client: boto3 S3 client
        bucket: S3 bucket
        prefix: S3 prefix for condemned data
    """
    if df.empty:
        return
    write_parquet_to_s3(df, s3_client, bucket, prefix, partition_cols=None)
    logger.warning(f"Wrote {len(df)} condemned rows to s3://{bucket}/{prefix}")

def write_success_marker(s3_client, bucket, prefix, run_id, metrics):
    """Writes a _SUCCESS file with metrics JSON."""
    key = f"{prefix}/_SUCCESS"
    content = json.dumps(metrics, indent=2)
    s3_client.put_object(Bucket=bucket, Key=key, Body=content)
    logger.info(f"Wrote success marker to s3://{bucket}/{key}")

def publish_cloudwatch_metrics(metrics, namespace="Ohpen/ETL", enabled=None):
    """
    Publish ETL metrics to CloudWatch (optional, disabled for local testing).

    Args:
        metrics: Dictionary containing metric values (input_rows, valid_rows, quarantined_rows, etc.)
        namespace: CloudWatch namespace (default: "Ohpen/ETL")
        enabled: Whether to publish metrics. If None, auto-detects based on environment.
                 Disabled if DISABLE_CLOUDWATCH=true or S3_ENDPOINT_URL is set (local testing)

    Returns:
        True if metrics were published, False otherwise
    """
    # Auto-detect if CloudWatch should be enabled
    if enabled is None:
        # Disable for local testing (MinIO) or if explicitly disabled
        enabled = (
            os.getenv('DISABLE_CLOUDWATCH', 'false').lower() != 'true' and
            os.getenv('S3_ENDPOINT_URL') is None  # No endpoint URL means AWS S3
        )

    if not enabled:
        logger.debug("CloudWatch metrics disabled (local testing or DISABLE_CLOUDWATCH=true)")
        return False

    try:
        cloudwatch = boto3.client('cloudwatch')

        # Calculate derived metrics
        quarantine_rate = (
            metrics['quarantined_rows'] / metrics['input_rows'] * 100
            if metrics['input_rows'] > 0 else 0.0
        )

        # Prepare metric data
        metric_data = [
            {
                'MetricName': 'InputRows',
                'Value': metrics['input_rows'],
                'Unit': 'Count',
                'Timestamp': datetime.datetime.utcnow()
            },
            {
                'MetricName': 'ValidRows',
                'Value': metrics['valid_rows'],
                'Unit': 'Count',
                'Timestamp': datetime.datetime.utcnow()
            },
            {
                'MetricName': 'QuarantinedRows',
                'Value': metrics['quarantined_rows'],
                'Unit': 'Count',
                'Timestamp': datetime.datetime.utcnow()
            },
            {
                'MetricName': 'QuarantineRate',
                'Value': quarantine_rate,
                'Unit': 'Percent',
                'Timestamp': datetime.datetime.utcnow()
            }
        ]

        # Add condemned rows metric
        if 'condemned_rows' in metrics:
            metric_data.append({
                'MetricName': 'CondemnedRows',
                'Value': metrics['condemned_rows'],
                'Unit': 'Count',
                'Timestamp': datetime.datetime.utcnow()
            })

        # Add loop prevention metrics
        if 'avg_attempt_count' in metrics:
            metric_data.append({
                'MetricName': 'AvgAttemptCount',
                'Value': metrics['avg_attempt_count'],
                'Unit': 'Count',
                'Timestamp': datetime.datetime.utcnow()
            })

        if 'duplicate_detection_rate' in metrics:
            metric_data.append({
                'MetricName': 'DuplicateDetectionRate',
                'Value': metrics['duplicate_detection_rate'],
                'Unit': 'Percent',
                'Timestamp': datetime.datetime.utcnow()
            })

        if 'auto_condemnation_rate' in metrics:
            metric_data.append({
                'MetricName': 'AutoCondemnationRate',
                'Value': metrics['auto_condemnation_rate'],
                'Unit': 'Percent',
                'Timestamp': datetime.datetime.utcnow()
            })

        # Add duration if available
        if 'duration_seconds' in metrics:
            metric_data.append({
                'MetricName': 'DurationSeconds',
                'Value': metrics['duration_seconds'],
                'Unit': 'Seconds',
                'Timestamp': datetime.datetime.utcnow()
            })

        # Publish metrics (CloudWatch allows up to 20 metrics per call)
        cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=metric_data
        )

        # Log structured metrics for CloudWatch Logs Insights
        logger.info(
            "Published CloudWatch metrics",
            extra={
                'run_id': metrics.get('run_id'),
                'metric_name': 'ETLRunComplete',
                'metric_value': {
                    'input_rows': metrics['input_rows'],
                    'valid_rows': metrics['valid_rows'],
                    'quarantined_rows': metrics['quarantined_rows'],
                    'condemned_rows': metrics.get('condemned_rows', 0),
                    'quarantine_rate': quarantine_rate,
                    'avg_attempt_count': metrics.get('avg_attempt_count', 0.0),
                    'auto_condemnation_rate': metrics.get('auto_condemnation_rate', 0.0),
                    'duplicate_detection_rate': metrics.get('duplicate_detection_rate', 0.0)
                }
            }
        )

        return True

    except Exception as e:
        # Don't fail the job if CloudWatch publishing fails
        logger.warning(
            f"Failed to publish CloudWatch metrics: {e}",
            extra={'run_id': metrics.get('run_id')}
        )
        return False

def main():
    """
    ETL Pipeline: Bronze -> Silver
    Reads CSV from Bronze layer (raw, immutable), validates, and writes Parquet to Silver layer.
    Invalid rows are quarantined for audit.
    """
    parser = argparse.ArgumentParser(description="Ingest Transactions ETL (Bronze -> Silver)")
    parser.add_argument('--input-bucket', required=True)
    parser.add_argument('--input-key', required=True)
    parser.add_argument('--output-bucket', required=True)
    parser.add_argument('--output-prefix', required=True)
    parser.add_argument('--quarantine-bucket', required=True)
    parser.add_argument('--quarantine-prefix', required=True)
    parser.add_argument('--endpoint-url', default=os.getenv('S3_ENDPOINT_URL'), help="S3 Endpoint URL (optional, for local testing)")
    parser.add_argument('--disable-cloudwatch', action='store_true', help="Disable CloudWatch metrics publishing (for local testing)")

    args = parser.parse_args()

    start_time = datetime.datetime.now(datetime.timezone.utc)
    run_id = start_time.strftime('%Y%m%dT%H%M%SZ')
    ingest_time = start_time.isoformat()

    # Log with structured format for CloudWatch Logs Insights
    logger.info(
        "Starting ETL run",
        extra={'run_id': run_id, 'metric_name': 'ETLStart', 'metric_value': 1}
    )

    s3 = get_s3_client(args.endpoint_url)

    # 1. Read from Bronze layer (raw, immutable)
    # Bronze path convention: bronze/mortgages/transactions/ingest_date=YYYY-MM-DD/run_id=.../file.csv.gz
    raw_df = read_csv_from_s3(s3, args.input_bucket, args.input_key)
    logger.info(f"Read {len(raw_df)} rows from Bronze layer (s3://{args.input_bucket}/{args.input_key}).")

    # 2. Initialize Circuit Breaker
    circuit_breaker = CircuitBreaker()

    # 3. Validate & Transform (Bronze -> Silver)
    source_file = f"s3://{args.input_bucket}/{args.input_key}"
    valid_df, quarantine_df, condemned_df = validate_and_transform(
        raw_df,
        run_id,
        source_file=source_file,
        ingest_time=ingest_time,
        s3_client=s3,
        quarantine_bucket=args.quarantine_bucket,
        quarantine_prefix=args.quarantine_prefix,
        circuit_breaker=circuit_breaker,
    )

    # 3. Write Valid Data to Silver layer (Partitioned Parquet)
    # Silver path convention: silver/mortgages/transactions/year=YYYY/month=MM/schema_v=v1/run_id=.../part-0000.parquet
    # We include run_id and schema_v in the prefix to ensure idempotency, history, and schema versioning
    schema_version = 'v1'  # In production, this would come from config or schema registry
    run_output_prefix = f"{args.output_prefix}/schema_v={schema_version}/run_id={run_id}"

    if not valid_df.empty:
        write_parquet_to_s3(
            valid_df,
            s3,
            args.output_bucket,
            run_output_prefix,
            partition_cols=['year', 'month']
        )

    # 4. Write Quarantine Data (Partitioned by ingest_date manually or via col)
    # Structure: quarantine/transactions/ingest_date=.../run_id=.../
    if not quarantine_df.empty:
        q_prefix = f"{args.quarantine_prefix}/ingest_date={datetime.datetime.now().strftime('%Y-%m-%d')}/run_id={run_id}"
        write_parquet_to_s3(
            quarantine_df,
            s3,
            args.quarantine_bucket,
            q_prefix,
            partition_cols=None # Flat file for quarantine usually easier to scan
        )
        logger.warning(f"Quarantined {len(quarantine_df)} invalid rows.")

    # 4b. Write Condemned Data (separate layer)
    # Structure: quarantine/transactions/condemned/ingest_date=.../run_id=.../
    if not condemned_df.empty:
        condemned_prefix = f"{args.quarantine_prefix}/condemned/ingest_date={datetime.datetime.now().strftime('%Y-%m-%d')}/run_id={run_id}"
        write_condemned_to_s3(
            condemned_df,
            s3,
            args.quarantine_bucket,
            condemned_prefix
        )
        logger.warning(f"Condemned {len(condemned_df)} rows (max attempts or exact duplicates).")

    # 5. Calculate duration
    end_time = datetime.datetime.now(datetime.timezone.utc)
    duration_seconds = (end_time - start_time).total_seconds()

    # 6. Prepare metrics
    metrics = {
        'run_id': run_id,
        'ingest_time': ingest_time,
        'source_file': source_file,
        'input_rows': len(raw_df),
        'valid_rows': len(valid_df),
        'quarantined_rows': len(quarantine_df),
        'condemned_rows': len(condemned_df),
        'duration_seconds': duration_seconds,
        'status': 'SUCCESS'
    }

    # Calculate loop prevention metrics
    all_processed = pd.concat([valid_df, quarantine_df, condemned_df], ignore_index=True) if not (valid_df.empty and quarantine_df.empty and condemned_df.empty) else pd.DataFrame()
    if not all_processed.empty and 'attempt_count' in all_processed.columns:
        metrics['avg_attempt_count'] = all_processed['attempt_count'].mean()

    if len(condemned_df) > 0:
        metrics['auto_condemnation_rate'] = (len(condemned_df) / len(raw_df)) * 100 if len(raw_df) > 0 else 0.0
        duplicate_condemned = condemned_df[condemned_df['validation_error'] == ERROR_DUPLICATE_FAILURE]
        if len(duplicate_condemned) > 0:
            metrics['duplicate_detection_rate'] = (len(duplicate_condemned) / len(raw_df)) * 100 if len(raw_df) > 0 else 0.0

    # Calculate quarantine breakdown by error type for detailed monitoring
    quarantine_by_reason = {}
    if not quarantine_df.empty and 'validation_error' in quarantine_df.columns:
        error_counts = quarantine_df['validation_error'].value_counts().to_dict()
        quarantine_by_reason = error_counts
        metrics['quarantine_by_reason'] = quarantine_by_reason

    # Add condemned breakdown
    if not condemned_df.empty and 'validation_error' in condemned_df.columns:
        condemned_by_reason = condemned_df['validation_error'].value_counts().to_dict()
        metrics['condemned_by_reason'] = condemned_by_reason

    # 7. Write Success Marker (for backward compatibility and manual inspection)
    write_success_marker(s3, args.output_bucket, run_output_prefix, run_id, metrics)

    # 8. Publish CloudWatch Metrics (optional, disabled for local testing)
    cloudwatch_enabled = not args.disable_cloudwatch
    publish_cloudwatch_metrics(metrics, enabled=cloudwatch_enabled)

    # 9. Log completion with structured metrics for CloudWatch Logs Insights
    logger.info(
        "ETL run completed successfully",
        extra={
            'run_id': run_id,
            'metric_name': 'ETLComplete',
            'metric_value': {
                'input_rows': metrics['input_rows'],
                'valid_rows': metrics['valid_rows'],
                'quarantined_rows': metrics['quarantined_rows'],
                'duration_seconds': duration_seconds,
                'quarantine_rate': metrics['quarantined_rows'] / metrics['input_rows'] * 100 if metrics['input_rows'] > 0 else 0.0,
                'quarantine_by_reason': quarantine_by_reason
            }
        }
    )

if __name__ == "__main__":
    main()

See Also

  • Pseudocode: ETL_PSEUDOCODE.md
  • Architecture Diagram: ETL_DIAGRAM.md
  • Assumptions & Edge Cases: ASSUMPTIONS_AND_EDGE_CASES.md