Skip to main content
|26 min read|Intermediate

Building Production-Grade ETL Pipelines: A Practitioner's Guide

Comprehensive guide to designing, implementing, and operating ETL pipelines that handle real-world complexity—covering idempotency, error handling, monitoring, testing, and operational excellence

ETLData PipelinesAirflowPythonBest PracticesProduction Systems
TL;DR

Most ETL tutorials focus on the happy path—reading data, transforming it, and writing output. But production systems must handle failures, ensure exactly-once semantics, support incremental processing, and provide observability. This guide covers the battle-tested patterns that distinguish reliable data pipelines from fragile ones.

Prerequisites
  • Basic understanding of Python programming
  • Familiarity with SQL and relational databases
  • Experience with at least one ETL tool (Airflow, dbt, etc.)
Building Production-Grade ETL Pipelines: A Practitioner's Guide

Building Production-Grade ETL Pipelines

Introduction

Every data engineer has experienced the sinking feeling: a pipeline that worked perfectly in development fails mysteriously in production. Data is missing, duplicated, or corrupted. Stakeholders are upset. The root cause is buried in logs across three different systems.

The gap between "it works on my machine" and "it runs reliably in production" is vast. This guide bridges that gap by covering the patterns, practices, and principles that transform fragile ETL scripts into robust production systems.

The Anatomy of a Production Pipeline

Before diving into specific techniques, let's establish what distinguishes production pipelines from development scripts:

┌────────────────────────────────────────────────────────────────────────────┐
│                    PRODUCTION PIPELINE REQUIREMENTS                        │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  RELIABILITY                              OBSERVABILITY                    │
│  ───────────                              ─────────────                    │
│  • Handles partial failures               • Metrics for every stage       │
│  • Retries with backoff                   • Structured logging            │
│  • Exactly-once semantics                 • Data lineage tracking         │
│  • Graceful degradation                   • Alerting on anomalies         │
│                                                                            │
│  MAINTAINABILITY                          PERFORMANCE                      │
│  ───────────────                          ───────────                      │
│  • Clear separation of concerns           • Incremental processing        │
│  • Comprehensive test coverage            • Parallelization               │
│  • Version-controlled configuration       • Resource optimization         │
│  • Self-documenting code                  • Backpressure handling         │
│                                                                            │
│  SECURITY                                 GOVERNANCE                       │
│  ────────                                 ──────────                       │
│  • Secrets management                     • Schema evolution              │
│  • Data encryption                        • Data quality validation       │
│  • Access control                         • Audit trails                  │
│  • PII handling                           • Compliance enforcement        │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Idempotency: The Foundation

Idempotency means that running an operation multiple times produces the same result as running it once. This property is fundamental because in distributed systems, operations may be retried due to timeouts, network partitions, or crashes.

Why Idempotency Matters

Consider a pipeline that inserts records into a database. If it fails after inserting half the records and is retried, without idempotency you'll get duplicate data. With idempotency, the retry produces the same final state as if it had succeeded the first time.

Implementing Idempotent Writes

# idempotent_writes.py
"""
Patterns for implementing idempotent data writes.
These patterns ensure that retry operations don't corrupt data.
"""
 
from typing import List, Dict, Any, Optional
from datetime import datetime, date
from dataclasses import dataclass
import hashlib
import json
 
@dataclass
class ProcessingCheckpoint:
    """Tracks pipeline execution state for recovery."""
    pipeline_id: str
    run_id: str
    partition_key: str  # e.g., "2024-01-15" for daily partitions
    stage: str
    status: str  # 'pending', 'in_progress', 'completed', 'failed'
    records_processed: int
    started_at: datetime
    completed_at: Optional[datetime]
    error_message: Optional[str]
 
 
class IdempotentWriter:
    """
    Base class for idempotent data writes.
    Subclasses implement specific storage backends.
    """
 
    def __init__(self, target_table: str, partition_columns: List[str]):
        self.target_table = target_table
        self.partition_columns = partition_columns
 
    def compute_record_hash(self, record: Dict[str, Any], key_columns: List[str]) -> str:
        """
        Compute deterministic hash for deduplication.
        Uses only business key columns, not system columns like timestamps.
        """
        key_values = {k: record[k] for k in sorted(key_columns)}
        key_string = json.dumps(key_values, sort_keys=True, default=str)
        return hashlib.sha256(key_string.encode()).hexdigest()
 
    def write_with_deduplication(
        self,
        records: List[Dict[str, Any]],
        key_columns: List[str],
        connection
    ) -> int:
        """
        Write records with hash-based deduplication.
        Existing records with matching hashes are skipped.
 
        Pattern: MERGE/UPSERT with hash comparison
        """
        # Add hash to each record
        for record in records:
            record['_record_hash'] = self.compute_record_hash(record, key_columns)
 
        # Use database-specific MERGE operation
        merge_sql = f"""
            MERGE INTO {self.target_table} AS target
            USING (SELECT * FROM staging_table) AS source
            ON target._record_hash = source._record_hash
            WHEN NOT MATCHED THEN
                INSERT (*)
            WHEN MATCHED THEN
                UPDATE SET * -- Optional: update if needed
        """
 
        cursor = connection.cursor()
        cursor.execute(merge_sql)
        affected_rows = cursor.rowcount
 
        return affected_rows
 
 
class PartitionSwapWriter(IdempotentWriter):
    """
    Idempotent writes using partition swap pattern.
    Atomically replaces entire partitions—no partial updates possible.
 
    This is the gold standard for idempotency in batch pipelines.
    """
 
    def write_partition(
        self,
        records: List[Dict[str, Any]],
        partition_value: str,
        connection
    ) -> int:
        """
        Atomically replace a partition with new data.
 
        Steps:
        1. Write to staging partition
        2. Validate staging data
        3. Atomic swap: drop old partition, rename staging
        """
        staging_partition = f"{self.target_table}__staging__{partition_value}"
 
        try:
            # Step 1: Write to staging
            self._write_to_staging(records, staging_partition, connection)
 
            # Step 2: Validate (row counts, checksums, etc.)
            self._validate_staging(staging_partition, len(records), connection)
 
            # Step 3: Atomic swap
            self._atomic_swap(partition_value, staging_partition, connection)
 
            return len(records)
 
        except Exception as e:
            # Clean up staging on failure
            self._cleanup_staging(staging_partition, connection)
            raise
 
    def _atomic_swap(self, partition_value: str, staging: str, connection):
        """
        Perform atomic partition swap.
        Implementation varies by database:
        - Snowflake: ALTER TABLE ... SWAP PARTITION
        - BigQuery: bq cp with --replace
        - Spark: Overwrite specific partition
        """
        # Snowflake example
        swap_sql = f"""
            ALTER TABLE {self.target_table}
            SWAP PARTITION {partition_value}
            WITH {staging}
        """
        connection.execute(swap_sql)
 
 
class WriteAheadLogWriter(IdempotentWriter):
    """
    Idempotent writes using write-ahead log (WAL) pattern.
    Records all intended writes before execution.
    Enables recovery from any failure point.
    """
 
    def __init__(self, target_table: str, wal_table: str, partition_columns: List[str]):
        super().__init__(target_table, partition_columns)
        self.wal_table = wal_table
 
    def write_with_wal(
        self,
        records: List[Dict[str, Any]],
        batch_id: str,
        connection
    ) -> int:
        """
        Write records using WAL for crash recovery.
 
        Pattern:
        1. Write batch metadata to WAL
        2. Write records to staging
        3. Move to target
        4. Mark WAL entry as complete
        """
        # Step 1: Record intent in WAL
        self._write_wal_entry(batch_id, 'STARTED', len(records), connection)
 
        try:
            # Step 2: Write to staging
            staging_count = self._write_staging(records, batch_id, connection)
            self._write_wal_entry(batch_id, 'STAGED', staging_count, connection)
 
            # Step 3: Move to target (using MERGE for idempotency)
            target_count = self._merge_to_target(batch_id, connection)
 
            # Step 4: Mark complete
            self._write_wal_entry(batch_id, 'COMPLETED', target_count, connection)
 
            return target_count
 
        except Exception as e:
            self._write_wal_entry(batch_id, 'FAILED', 0, connection, str(e))
            raise
 
    def recover_incomplete_batches(self, connection) -> List[str]:
        """
        Find and recover batches that started but didn't complete.
        Called during pipeline startup.
        """
        incomplete_sql = f"""
            SELECT batch_id, status, record_count
            FROM {self.wal_table}
            WHERE status NOT IN ('COMPLETED', 'ABANDONED')
            ORDER BY created_at
        """
 
        incomplete = connection.execute(incomplete_sql).fetchall()
        recovered = []
 
        for batch_id, status, record_count in incomplete:
            if status == 'STAGED':
                # Resume from staging
                self._merge_to_target(batch_id, connection)
                self._write_wal_entry(batch_id, 'COMPLETED', record_count, connection)
                recovered.append(batch_id)
            elif status == 'STARTED':
                # Need to re-extract and reprocess
                # Mark as abandoned, will be picked up by normal processing
                self._write_wal_entry(batch_id, 'ABANDONED', 0, connection)
 
        return recovered

Incremental Processing

Full refreshes are simple but don't scale. Incremental processing loads only new or changed data, dramatically reducing processing time and cost.

Incremental Extraction Patterns

# incremental_extraction.py
"""
Patterns for incremental data extraction.
Each pattern handles different source system characteristics.
"""
 
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
import hashlib
 
@dataclass
class ExtractionState:
    """Tracks extraction progress for resumability."""
    source_table: str
    last_extracted_at: datetime
    high_watermark: Any  # Could be timestamp, ID, or version
    watermark_column: str
    rows_extracted: int
    checksum: Optional[str]
 
 
class IncrementalExtractor(ABC):
    """Base class for incremental extraction strategies."""
 
    @abstractmethod
    def get_incremental_predicate(self, state: ExtractionState) -> str:
        """Return SQL predicate for incremental extraction."""
        pass
 
    @abstractmethod
    def update_state(self, state: ExtractionState, extracted_data: List[Dict]) -> ExtractionState:
        """Update extraction state after successful extraction."""
        pass
 
 
class TimestampBasedExtractor(IncrementalExtractor):
    """
    Extract records based on modification timestamp.
 
    Requirements:
    - Source has reliable updated_at column
    - Updates always modify timestamp
    - Clock skew is bounded
 
    Considerations:
    - Add overlap window to handle clock skew
    - May extract same records multiple times (use dedup on load)
    """
 
    def __init__(
        self,
        timestamp_column: str = 'updated_at',
        overlap_seconds: int = 300  # 5-minute overlap for clock skew
    ):
        self.timestamp_column = timestamp_column
        self.overlap_seconds = overlap_seconds
 
    def get_incremental_predicate(self, state: ExtractionState) -> str:
        # Subtract overlap to handle clock skew
        safe_watermark = state.high_watermark - timedelta(seconds=self.overlap_seconds)
        return f"{self.timestamp_column} >= '{safe_watermark.isoformat()}'"
 
    def update_state(
        self,
        state: ExtractionState,
        extracted_data: List[Dict]
    ) -> ExtractionState:
        if not extracted_data:
            return state
 
        # Find new high watermark
        max_timestamp = max(row[self.timestamp_column] for row in extracted_data)
 
        return ExtractionState(
            source_table=state.source_table,
            last_extracted_at=datetime.utcnow(),
            high_watermark=max_timestamp,
            watermark_column=self.timestamp_column,
            rows_extracted=state.rows_extracted + len(extracted_data),
            checksum=None
        )
 
 
class CDCExtractor(IncrementalExtractor):
    """
    Extract using Change Data Capture (CDC).
 
    Captures inserts, updates, and deletes from transaction log.
    Gold standard for incremental extraction when available.
 
    Sources:
    - Debezium (Kafka Connect)
    - AWS DMS
    - Fivetran / Airbyte
    - Native CDC (SQL Server, Oracle GoldenGate)
    """
 
    def __init__(self, cdc_table: str, lsn_column: str = '_cdc_lsn'):
        self.cdc_table = cdc_table
        self.lsn_column = lsn_column
 
    def get_incremental_predicate(self, state: ExtractionState) -> str:
        return f"{self.lsn_column} > '{state.high_watermark}'"
 
    def process_cdc_records(
        self,
        cdc_records: List[Dict]
    ) -> Tuple[List[Dict], List[str]]:
        """
        Process CDC records into upserts and deletes.
 
        Returns:
            Tuple of (records_to_upsert, keys_to_delete)
        """
        upserts = []
        deletes = []
 
        for record in cdc_records:
            operation = record.get('_cdc_operation', 'u')
 
            if operation == 'd':
                # Delete operation
                deletes.append(record['_pk'])
            else:
                # Insert or Update
                # Remove CDC metadata before upserting
                clean_record = {
                    k: v for k, v in record.items()
                    if not k.startswith('_cdc_')
                }
                upserts.append(clean_record)
 
        return upserts, deletes
 
 
class SnapshotDiffExtractor(IncrementalExtractor):
    """
    Extract by comparing full snapshots.
 
    Use when:
    - Source has no reliable timestamp
    - Source has no CDC capability
    - Need to detect deletes
 
    Approach:
    - Take full snapshot hash
    - Compare with previous snapshot
    - Extract only changed rows
    """
 
    def __init__(self, primary_key_columns: List[str], comparison_columns: List[str]):
        self.pk_columns = primary_key_columns
        self.comparison_columns = comparison_columns
 
    def compute_row_signature(self, row: Dict[str, Any]) -> str:
        """Compute hash of row for comparison."""
        values = [str(row.get(col, '')) for col in self.comparison_columns]
        return hashlib.md5('|'.join(values).encode()).hexdigest()
 
    def diff_snapshots(
        self,
        current_snapshot: List[Dict],
        previous_signatures: Dict[str, str]
    ) -> Tuple[List[Dict], List[Dict], List[str]]:
        """
        Compare snapshots to find changes.
 
        Returns:
            Tuple of (inserts, updates, deleted_keys)
        """
        current_by_key = {}
        current_signatures = {}
 
        for row in current_snapshot:
            key = tuple(row[col] for col in self.pk_columns)
            signature = self.compute_row_signature(row)
            current_by_key[key] = row
            current_signatures[key] = signature
 
        inserts = []
        updates = []
        deletes = []
 
        # Find inserts and updates
        for key, row in current_by_key.items():
            key_str = str(key)
            current_sig = current_signatures[key]
 
            if key_str not in previous_signatures:
                inserts.append(row)
            elif previous_signatures[key_str] != current_sig:
                updates.append(row)
 
        # Find deletes
        current_keys = {str(k) for k in current_by_key.keys()}
        for prev_key in previous_signatures.keys():
            if prev_key not in current_keys:
                deletes.append(prev_key)
 
        return inserts, updates, deletes

Airflow DAG for Incremental Pipeline

# dags/incremental_pipeline.py
"""
Production Airflow DAG demonstrating incremental ETL patterns.
Includes checkpointing, error handling, and observability.
"""
 
from datetime import datetime, timedelta
from typing import Dict, Any
import json
 
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import Variable
from airflow.exceptions import AirflowException
 
# Custom modules (would be in separate files)
from plugins.extraction import TimestampBasedExtractor, ExtractionState
from plugins.loading import PartitionSwapWriter
from plugins.monitoring import PipelineMetrics, DataQualityChecker
 
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': True,  # Important for incremental pipelines
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
    'execution_timeout': timedelta(hours=2),
}
 
with DAG(
    dag_id='incremental_orders_pipeline',
    default_args=default_args,
    description='Incremental load of orders data with quality checks',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=True,  # Enable backfill for historical data
    max_active_runs=1,  # Prevent parallel runs that could cause conflicts
    tags=['production', 'incremental', 'orders'],
) as dag:
 
    @task(task_id='get_extraction_state')
    def get_extraction_state(**context) -> Dict[str, Any]:
        """
        Retrieve or initialize extraction state.
        State is stored in Airflow Variables for persistence.
        """
        state_key = f"extraction_state_{dag.dag_id}"
 
        try:
            state_json = Variable.get(state_key)
            state = json.loads(state_json)
 
            # For backfills, override with execution date
            execution_date = context['data_interval_start']
            if context['dag_run'].is_backfill:
                state['high_watermark'] = execution_date.isoformat()
 
        except KeyError:
            # Initialize state for first run
            execution_date = context['data_interval_start']
            state = {
                'source_table': 'source_db.orders',
                'high_watermark': (execution_date - timedelta(days=1)).isoformat(),
                'watermark_column': 'updated_at',
                'rows_extracted': 0
            }
 
        return state
 
    @task(task_id='extract_incremental')
    def extract_incremental(state: Dict[str, Any], **context) -> Dict[str, Any]:
        """
        Extract records modified since last run.
        Pushes extraction metrics to monitoring system.
        """
        from plugins.connectors import SourceDatabaseConnector
 
        metrics = PipelineMetrics(context['dag_run'].run_id)
        extractor = TimestampBasedExtractor(timestamp_column='updated_at')
 
        # Build and execute extraction query
        with SourceDatabaseConnector() as conn:
            extraction_state = ExtractionState(**state)
            predicate = extractor.get_incremental_predicate(extraction_state)
 
            query = f"""
                SELECT *
                FROM {state['source_table']}
                WHERE {predicate}
                AND updated_at < '{context['data_interval_end'].isoformat()}'
                ORDER BY updated_at
            """
 
            metrics.start_timer('extraction')
            records = conn.execute(query).fetchall()
            metrics.stop_timer('extraction')
 
            metrics.record_count('extracted_rows', len(records))
 
        # Store extracted data for next task
        # In production, use XCom backed by external storage for large datasets
        return {
            'records': records,
            'count': len(records),
            'min_watermark': min(r['updated_at'] for r in records).isoformat() if records else None,
            'max_watermark': max(r['updated_at'] for r in records).isoformat() if records else None,
        }
 
    @task(task_id='transform')
    def transform(extraction_result: Dict[str, Any], **context) -> Dict[str, Any]:
        """
        Apply business transformations.
        Validates and enriches extracted records.
        """
        records = extraction_result['records']
 
        if not records:
            return {'records': [], 'count': 0}
 
        transformed = []
        errors = []
 
        for record in records:
            try:
                # Apply transformations
                transformed_record = {
                    'order_id': record['order_id'],
                    'customer_id': record['customer_id'],
                    'order_date': record['created_at'].date(),
                    'total_amount': float(record['total']),
                    'currency': record.get('currency', 'USD').upper(),
                    'status': record['status'].lower(),
                    'item_count': record.get('item_count', 0),
 
                    # Add derived fields
                    'order_month': record['created_at'].strftime('%Y-%m'),
                    'is_high_value': float(record['total']) > 1000,
 
                    # Audit fields
                    '_extracted_at': context['data_interval_end'].isoformat(),
                    '_pipeline_run_id': context['dag_run'].run_id,
                }
 
                transformed.append(transformed_record)
 
            except Exception as e:
                errors.append({
                    'record_id': record.get('order_id'),
                    'error': str(e)
                })
 
        if errors:
            # Log errors but continue - don't fail entire batch for bad records
            context['ti'].xcom_push(key='transform_errors', value=errors)
 
        return {'records': transformed, 'count': len(transformed)}
 
    @task(task_id='validate_quality')
    def validate_quality(transform_result: Dict[str, Any], **context) -> bool:
        """
        Run data quality checks before loading.
        Fails pipeline if critical checks don't pass.
        """
        records = transform_result['records']
 
        if not records:
            return True  # Nothing to validate
 
        checker = DataQualityChecker()
 
        # Define quality expectations
        checks = [
            # Completeness checks
            checker.check_not_null(records, 'order_id'),
            checker.check_not_null(records, 'customer_id'),
            checker.check_not_null(records, 'total_amount'),
 
            # Validity checks
            checker.check_positive(records, 'total_amount'),
            checker.check_in_set(records, 'status',
                               {'pending', 'completed', 'cancelled', 'refunded'}),
 
            # Uniqueness check
            checker.check_unique(records, 'order_id'),
 
            # Referential check (cached lookup)
            checker.check_foreign_key(records, 'customer_id',
                                     reference_table='dim_customers'),
        ]
 
        failed_checks = [c for c in checks if not c.passed]
 
        if failed_checks:
            critical_failures = [c for c in failed_checks if c.severity == 'critical']
 
            if critical_failures:
                raise AirflowException(
                    f"Critical data quality failures: {critical_failures}"
                )
 
            # Log warnings for non-critical failures
            for check in failed_checks:
                context['ti'].log.warning(f"Quality check warning: {check}")
 
        return True
 
    @task(task_id='load_to_warehouse')
    def load_to_warehouse(
        transform_result: Dict[str, Any],
        quality_passed: bool,
        **context
    ) -> Dict[str, Any]:
        """
        Idempotent load to data warehouse.
        Uses partition swap for atomic updates.
        """
        if not quality_passed:
            raise AirflowException("Quality validation did not pass")
 
        records = transform_result['records']
 
        if not records:
            return {'loaded_count': 0}
 
        writer = PartitionSwapWriter(
            target_table='warehouse.fact_orders',
            partition_columns=['order_month']
        )
 
        from plugins.connectors import WarehouseConnector
 
        metrics = PipelineMetrics(context['dag_run'].run_id)
 
        with WarehouseConnector() as conn:
            metrics.start_timer('loading')
 
            # Group records by partition
            partitions = {}
            for record in records:
                partition = record['order_month']
                partitions.setdefault(partition, []).append(record)
 
            total_loaded = 0
            for partition_value, partition_records in partitions.items():
                loaded = writer.write_partition(
                    records=partition_records,
                    partition_value=partition_value,
                    connection=conn
                )
                total_loaded += loaded
 
            metrics.stop_timer('loading')
            metrics.record_count('loaded_rows', total_loaded)
 
        return {'loaded_count': total_loaded}
 
    @task(task_id='update_extraction_state')
    def update_extraction_state(
        extraction_result: Dict[str, Any],
        load_result: Dict[str, Any],
        **context
    ):
        """
        Persist extraction state for next run.
        Only called after successful load.
        """
        if extraction_result['max_watermark']:
            state_key = f"extraction_state_{dag.dag_id}"
 
            current_state = json.loads(Variable.get(state_key, '{}'))
            current_state['high_watermark'] = extraction_result['max_watermark']
            current_state['rows_extracted'] = current_state.get('rows_extracted', 0) + extraction_result['count']
            current_state['last_run'] = context['data_interval_end'].isoformat()
 
            Variable.set(state_key, json.dumps(current_state))
 
    # Define task dependencies
    state = get_extraction_state()
    extraction = extract_incremental(state)
    transformed = transform(extraction)
    validated = validate_quality(transformed)
    loaded = load_to_warehouse(transformed, validated)
    update_extraction_state(extraction, loaded)

Error Handling and Recovery

Errors are inevitable. The question is not whether they'll occur, but how gracefully your pipeline handles them.

Error Handling Hierarchy

# error_handling.py
"""
Comprehensive error handling patterns for production pipelines.
Implements retry strategies, dead letter queues, and recovery mechanisms.
"""
 
from enum import Enum
from typing import Callable, TypeVar, Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
import time
import traceback
import functools
 
T = TypeVar('T')
 
class ErrorSeverity(Enum):
    """Classification of error severity for handling decisions."""
    TRANSIENT = 'transient'      # Retry will likely succeed
    RECOVERABLE = 'recoverable'  # Can continue with partial data
    FATAL = 'fatal'              # Must stop processing
 
 
class RetryPolicy(Enum):
    """Standard retry policies."""
    NONE = 'none'
    LINEAR = 'linear'
    EXPONENTIAL = 'exponential'
    EXPONENTIAL_JITTER = 'exponential_jitter'
 
 
@dataclass
class RetryConfig:
    """Configuration for retry behavior."""
    max_attempts: int = 3
    base_delay_seconds: float = 1.0
    max_delay_seconds: float = 300.0
    policy: RetryPolicy = RetryPolicy.EXPONENTIAL_JITTER
    retryable_exceptions: tuple = (ConnectionError, TimeoutError)
 
 
def with_retry(config: RetryConfig):
    """
    Decorator for retrying operations with configurable backoff.
 
    Usage:
        @with_retry(RetryConfig(max_attempts=5))
        def call_external_api():
            ...
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> T:
            last_exception = None
 
            for attempt in range(1, config.max_attempts + 1):
                try:
                    return func(*args, **kwargs)
 
                except config.retryable_exceptions as e:
                    last_exception = e
 
                    if attempt == config.max_attempts:
                        break
 
                    delay = _calculate_delay(attempt, config)
 
                    # Log retry attempt
                    print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.2f}s")
                    time.sleep(delay)
 
                except Exception as e:
                    # Non-retryable exception
                    raise
 
            raise last_exception
 
        return wrapper
    return decorator
 
 
def _calculate_delay(attempt: int, config: RetryConfig) -> float:
    """Calculate delay based on retry policy."""
    import random
 
    if config.policy == RetryPolicy.NONE:
        return 0
    elif config.policy == RetryPolicy.LINEAR:
        delay = config.base_delay_seconds * attempt
    elif config.policy == RetryPolicy.EXPONENTIAL:
        delay = config.base_delay_seconds * (2 ** (attempt - 1))
    elif config.policy == RetryPolicy.EXPONENTIAL_JITTER:
        base_delay = config.base_delay_seconds * (2 ** (attempt - 1))
        delay = base_delay * (0.5 + random.random())  # 50-150% of base
    else:
        delay = config.base_delay_seconds
 
    return min(delay, config.max_delay_seconds)
 
 
@dataclass
class ProcessingError:
    """Structured representation of a processing error."""
    record_id: str
    error_type: str
    error_message: str
    severity: ErrorSeverity
    stack_trace: str
    occurred_at: datetime = field(default_factory=datetime.utcnow)
    context: Dict[str, Any] = field(default_factory=dict)
 
 
class DeadLetterQueue:
    """
    Handles records that fail processing.
    Enables investigation and reprocessing without blocking pipeline.
    """
 
    def __init__(self, queue_table: str, connection):
        self.queue_table = queue_table
        self.connection = connection
 
    def send(self, record: Dict[str, Any], error: ProcessingError):
        """Send failed record to DLQ for later investigation."""
        dlq_entry = {
            'record_id': error.record_id,
            'original_record': record,
            'error_type': error.error_type,
            'error_message': error.error_message,
            'severity': error.severity.value,
            'stack_trace': error.stack_trace,
            'context': error.context,
            'occurred_at': error.occurred_at,
            'status': 'pending',
            'retry_count': 0
        }
 
        self.connection.execute(
            f"INSERT INTO {self.queue_table} VALUES (?)",
            (dlq_entry,)
        )
 
    def get_pending(self, limit: int = 100) -> List[Dict[str, Any]]:
        """Retrieve pending records for reprocessing."""
        return self.connection.execute(
            f"""
            SELECT * FROM {self.queue_table}
            WHERE status = 'pending'
            AND retry_count < 3
            ORDER BY occurred_at
            LIMIT {limit}
            """
        ).fetchall()
 
    def mark_resolved(self, record_id: str, resolution: str):
        """Mark DLQ entry as resolved."""
        self.connection.execute(
            f"""
            UPDATE {self.queue_table}
            SET status = 'resolved',
                resolution = ?,
                resolved_at = CURRENT_TIMESTAMP
            WHERE record_id = ?
            """,
            (resolution, record_id)
        )
 
 
class BatchProcessor:
    """
    Processes records in batches with error isolation.
    Failed records go to DLQ; successful records continue.
    """
 
    def __init__(
        self,
        process_func: Callable[[Dict], Dict],
        dlq: DeadLetterQueue,
        error_threshold_pct: float = 10.0  # Fail batch if >10% errors
    ):
        self.process_func = process_func
        self.dlq = dlq
        self.error_threshold_pct = error_threshold_pct
 
    def process_batch(
        self,
        records: List[Dict[str, Any]],
        context: Dict[str, Any]
    ) -> tuple[List[Dict], List[ProcessingError]]:
        """
        Process batch with error isolation.
 
        Returns:
            Tuple of (successful_records, errors)
        """
        successful = []
        errors = []
 
        for record in records:
            try:
                result = self.process_func(record)
                successful.append(result)
 
            except Exception as e:
                error = ProcessingError(
                    record_id=str(record.get('id', 'unknown')),
                    error_type=type(e).__name__,
                    error_message=str(e),
                    severity=self._classify_error(e),
                    stack_trace=traceback.format_exc(),
                    context=context
                )
                errors.append(error)
 
                # Send to DLQ for later investigation
                self.dlq.send(record, error)
 
        # Check error threshold
        error_pct = (len(errors) / len(records)) * 100 if records else 0
        if error_pct > self.error_threshold_pct:
            raise Exception(
                f"Batch error rate {error_pct:.1f}% exceeds threshold "
                f"{self.error_threshold_pct}%"
            )
 
        return successful, errors
 
    def _classify_error(self, exception: Exception) -> ErrorSeverity:
        """Classify error severity based on exception type."""
        transient_errors = (
            ConnectionError, TimeoutError,
            # Add database-specific transient errors
        )
 
        if isinstance(exception, transient_errors):
            return ErrorSeverity.TRANSIENT
        elif isinstance(exception, (ValueError, KeyError)):
            return ErrorSeverity.RECOVERABLE
        else:
            return ErrorSeverity.FATAL

Observability

You can't improve what you can't measure. Production pipelines require comprehensive observability.

Metrics, Logs, and Lineage

# observability.py
"""
Observability infrastructure for production ETL pipelines.
Covers metrics, structured logging, and data lineage.
"""
 
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import time
from contextlib import contextmanager
 
class MetricType(Enum):
    COUNTER = 'counter'
    GAUGE = 'gauge'
    HISTOGRAM = 'histogram'
    TIMER = 'timer'
 
 
@dataclass
class PipelineMetrics:
    """
    Collects and exports pipeline metrics.
    Compatible with Prometheus, DataDog, CloudWatch.
    """
 
    run_id: str
    pipeline_name: str = ''
    metrics: Dict[str, Any] = field(default_factory=dict)
    timers: Dict[str, float] = field(default_factory=dict)
    _timer_starts: Dict[str, float] = field(default_factory=dict)
 
    def record_count(self, name: str, value: int, tags: Dict[str, str] = None):
        """Record a count metric."""
        key = self._metric_key(name, tags)
        self.metrics[key] = {
            'type': MetricType.COUNTER.value,
            'value': value,
            'tags': tags or {},
            'timestamp': datetime.utcnow().isoformat()
        }
 
    def record_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """Record a gauge (point-in-time) metric."""
        key = self._metric_key(name, tags)
        self.metrics[key] = {
            'type': MetricType.GAUGE.value,
            'value': value,
            'tags': tags or {},
            'timestamp': datetime.utcnow().isoformat()
        }
 
    def start_timer(self, name: str):
        """Start a timer for duration tracking."""
        self._timer_starts[name] = time.time()
 
    def stop_timer(self, name: str) -> float:
        """Stop timer and record duration."""
        if name not in self._timer_starts:
            raise ValueError(f"Timer '{name}' was never started")
 
        duration = time.time() - self._timer_starts[name]
        self.timers[name] = duration
 
        self.metrics[f"timer.{name}"] = {
            'type': MetricType.TIMER.value,
            'value': duration,
            'timestamp': datetime.utcnow().isoformat()
        }
 
        del self._timer_starts[name]
        return duration
 
    @contextmanager
    def timed(self, name: str):
        """Context manager for timing code blocks."""
        self.start_timer(name)
        try:
            yield
        finally:
            self.stop_timer(name)
 
    def _metric_key(self, name: str, tags: Dict[str, str] = None) -> str:
        if not tags:
            return name
        tag_str = ','.join(f"{k}={v}" for k, v in sorted(tags.items()))
        return f"{name}[{tag_str}]"
 
    def export_prometheus(self) -> str:
        """Export metrics in Prometheus exposition format."""
        lines = []
        for key, metric in self.metrics.items():
            name = key.split('[')[0].replace('.', '_')
            tags = metric.get('tags', {})
            tag_str = ','.join(f'{k}="{v}"' for k, v in tags.items())
            tag_part = f'{{{tag_str}}}' if tag_str else ''
            lines.append(f'{name}{tag_part} {metric["value"]}')
        return '\n'.join(lines)
 
    def export_json(self) -> str:
        """Export metrics as JSON for CloudWatch/DataDog."""
        return json.dumps({
            'run_id': self.run_id,
            'pipeline': self.pipeline_name,
            'metrics': self.metrics,
            'timers': self.timers,
            'exported_at': datetime.utcnow().isoformat()
        })
 
 
class StructuredLogger:
    """
    Structured logging for pipeline observability.
    All log entries are JSON for easy parsing and querying.
    """
 
    def __init__(self, pipeline_name: str, run_id: str):
        self.pipeline_name = pipeline_name
        self.run_id = run_id
        self.base_context = {
            'pipeline': pipeline_name,
            'run_id': run_id
        }
 
    def _log(self, level: str, message: str, **kwargs):
        entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': level,
            'message': message,
            **self.base_context,
            **kwargs
        }
        # In production, send to logging infrastructure
        print(json.dumps(entry))
 
    def info(self, message: str, **kwargs):
        self._log('INFO', message, **kwargs)
 
    def warning(self, message: str, **kwargs):
        self._log('WARNING', message, **kwargs)
 
    def error(self, message: str, exception: Exception = None, **kwargs):
        if exception:
            kwargs['exception_type'] = type(exception).__name__
            kwargs['exception_message'] = str(exception)
        self._log('ERROR', message, **kwargs)
 
    def stage_start(self, stage_name: str, **kwargs):
        """Log the start of a pipeline stage."""
        self.info(f"Stage started: {stage_name}", stage=stage_name, event='stage_start', **kwargs)
 
    def stage_complete(self, stage_name: str, duration_seconds: float, records_processed: int, **kwargs):
        """Log the completion of a pipeline stage."""
        self.info(
            f"Stage completed: {stage_name}",
            stage=stage_name,
            event='stage_complete',
            duration_seconds=duration_seconds,
            records_processed=records_processed,
            **kwargs
        )
 
 
@dataclass
class LineageNode:
    """Represents a dataset in the lineage graph."""
    id: str
    name: str
    type: str  # 'source', 'transform', 'destination'
    schema: Optional[Dict] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
 
 
@dataclass
class LineageEdge:
    """Represents a transformation between datasets."""
    source_id: str
    target_id: str
    transformation: str
    columns_mapped: Dict[str, List[str]] = field(default_factory=dict)  # target -> sources
 
 
class DataLineageTracker:
    """
    Tracks data lineage through the pipeline.
    Answers: Where did this data come from? What transformations were applied?
    """
 
    def __init__(self, pipeline_name: str, run_id: str):
        self.pipeline_name = pipeline_name
        self.run_id = run_id
        self.nodes: Dict[str, LineageNode] = {}
        self.edges: List[LineageEdge] = []
 
    def register_source(
        self,
        name: str,
        schema: Dict = None,
        **metadata
    ) -> str:
        """Register a source dataset."""
        node_id = f"source.{name}"
        self.nodes[node_id] = LineageNode(
            id=node_id,
            name=name,
            type='source',
            schema=schema,
            metadata=metadata
        )
        return node_id
 
    def register_transform(
        self,
        name: str,
        source_ids: List[str],
        column_mappings: Dict[str, List[str]] = None,
        **metadata
    ) -> str:
        """Register a transformation and its lineage."""
        node_id = f"transform.{name}"
        self.nodes[node_id] = LineageNode(
            id=node_id,
            name=name,
            type='transform',
            metadata=metadata
        )
 
        for source_id in source_ids:
            self.edges.append(LineageEdge(
                source_id=source_id,
                target_id=node_id,
                transformation=name,
                columns_mapped=column_mappings or {}
            ))
 
        return node_id
 
    def register_destination(
        self,
        name: str,
        source_id: str,
        schema: Dict = None,
        **metadata
    ) -> str:
        """Register a destination dataset."""
        node_id = f"destination.{name}"
        self.nodes[node_id] = LineageNode(
            id=node_id,
            name=name,
            type='destination',
            schema=schema,
            metadata=metadata
        )
 
        self.edges.append(LineageEdge(
            source_id=source_id,
            target_id=node_id,
            transformation='load'
        ))
 
        return node_id
 
    def get_upstream(self, node_id: str) -> List[str]:
        """Get all upstream datasets for a node."""
        upstream = []
        direct_sources = [e.source_id for e in self.edges if e.target_id == node_id]
 
        for source in direct_sources:
            upstream.append(source)
            upstream.extend(self.get_upstream(source))
 
        return upstream
 
    def export_openlineage(self) -> Dict:
        """Export lineage in OpenLineage format for integration with tools."""
        return {
            'producer': f'https://gemut.com/pipelines/{self.pipeline_name}',
            'schemaURL': 'https://openlineage.io/spec/1-0-5/OpenLineage.json',
            'run': {
                'runId': self.run_id,
                'facets': {
                    'pipeline': {'name': self.pipeline_name}
                }
            },
            'inputs': [
                {'namespace': 'production', 'name': n.name}
                for n in self.nodes.values() if n.type == 'source'
            ],
            'outputs': [
                {'namespace': 'production', 'name': n.name}
                for n in self.nodes.values() if n.type == 'destination'
            ]
        }

Testing Strategies

Untested pipelines are unreliable pipelines. Testing data pipelines requires multiple strategies.

Unit, Integration, and Data Quality Tests

# tests/test_pipeline.py
"""
Comprehensive testing patterns for ETL pipelines.
Covers unit tests, integration tests, and data quality tests.
"""
 
import pytest
from unittest.mock import Mock, patch
from datetime import datetime, date
from typing import List, Dict
 
# ============================================================================
# Unit Tests: Test individual functions in isolation
# ============================================================================
 
class TestTransformations:
    """Unit tests for transformation logic."""
 
    def test_currency_normalization(self):
        """Test that currencies are properly normalized."""
        from pipeline.transforms import normalize_currency
 
        assert normalize_currency({'total': 100, 'currency': 'usd'}) == {
            'total': 100, 'currency': 'USD'
        }
        assert normalize_currency({'total': 100, 'currency': None}) == {
            'total': 100, 'currency': 'USD'  # Default
        }
 
    def test_derived_fields(self):
        """Test derived field calculations."""
        from pipeline.transforms import add_derived_fields
 
        record = {
            'order_id': '123',
            'created_at': datetime(2024, 1, 15, 10, 30),
            'total': 1500.00
        }
 
        result = add_derived_fields(record)
 
        assert result['order_month'] == '2024-01'
        assert result['is_high_value'] == True
        assert result['order_hour'] == 10
 
    def test_null_handling(self):
        """Test that null values are handled correctly."""
        from pipeline.transforms import clean_record
 
        record = {
            'id': '123',
            'name': None,
            'email': '',
            'value': 0
        }
 
        result = clean_record(record)
 
        assert result['name'] is None  # Preserved
        assert result['email'] is None  # Empty string -> None
        assert result['value'] == 0     # Zero preserved
 
    def test_type_coercion(self):
        """Test type coercion for various input formats."""
        from pipeline.transforms import coerce_types
 
        record = {
            'amount': '123.45',
            'count': '100',
            'date': '2024-01-15',
            'flag': 'true'
        }
 
        schema = {
            'amount': float,
            'count': int,
            'date': date,
            'flag': bool
        }
 
        result = coerce_types(record, schema)
 
        assert result['amount'] == 123.45
        assert result['count'] == 100
        assert result['date'] == date(2024, 1, 15)
        assert result['flag'] == True
 
 
class TestIdempotency:
    """Tests for idempotency guarantees."""
 
    def test_record_hash_deterministic(self):
        """Record hash should be deterministic across runs."""
        from pipeline.loading import IdempotentWriter
 
        writer = IdempotentWriter('test_table', [])
 
        record = {'id': '123', 'name': 'Test', 'value': 100}
 
        hash1 = writer.compute_record_hash(record, ['id', 'name'])
        hash2 = writer.compute_record_hash(record, ['id', 'name'])
 
        assert hash1 == hash2
 
    def test_record_hash_ignores_order(self):
        """Hash should be same regardless of dict key order."""
        from pipeline.loading import IdempotentWriter
 
        writer = IdempotentWriter('test_table', [])
 
        record1 = {'id': '123', 'name': 'Test'}
        record2 = {'name': 'Test', 'id': '123'}  # Different order
 
        hash1 = writer.compute_record_hash(record1, ['id', 'name'])
        hash2 = writer.compute_record_hash(record2, ['id', 'name'])
 
        assert hash1 == hash2
 
 
# ============================================================================
# Integration Tests: Test components working together
# ============================================================================
 
class TestExtractionIntegration:
    """Integration tests for extraction from real sources."""
 
    @pytest.fixture
    def test_database(self):
        """Set up test database with sample data."""
        import sqlite3
 
        conn = sqlite3.connect(':memory:')
        conn.execute('''
            CREATE TABLE orders (
                order_id TEXT PRIMARY KEY,
                customer_id TEXT,
                total REAL,
                updated_at TIMESTAMP
            )
        ''')
 
        # Insert test data
        conn.executemany(
            'INSERT INTO orders VALUES (?, ?, ?, ?)',
            [
                ('ord-1', 'cust-1', 100.0, '2024-01-15 10:00:00'),
                ('ord-2', 'cust-2', 200.0, '2024-01-15 11:00:00'),
                ('ord-3', 'cust-1', 150.0, '2024-01-15 12:00:00'),
            ]
        )
        conn.commit()
 
        yield conn
        conn.close()
 
    def test_incremental_extraction(self, test_database):
        """Test that incremental extraction respects watermark."""
        from pipeline.extraction import TimestampBasedExtractor, ExtractionState
 
        extractor = TimestampBasedExtractor(
            timestamp_column='updated_at',
            overlap_seconds=0  # No overlap for testing
        )
 
        # Initial state: extract from beginning
        state = ExtractionState(
            source_table='orders',
            last_extracted_at=datetime(2024, 1, 1),
            high_watermark=datetime(2024, 1, 15, 10, 30),
            watermark_column='updated_at',
            rows_extracted=0,
            checksum=None
        )
 
        predicate = extractor.get_incremental_predicate(state)
 
        cursor = test_database.cursor()
        cursor.execute(f"SELECT * FROM orders WHERE {predicate}")
        results = cursor.fetchall()
 
        # Should get only records after 10:30
        assert len(results) == 2
        assert results[0][0] == 'ord-2'  # 11:00
        assert results[1][0] == 'ord-3'  # 12:00
 
 
class TestLoadIntegration:
    """Integration tests for data loading."""
 
    @pytest.fixture
    def target_database(self):
        """Set up target database."""
        import sqlite3
 
        conn = sqlite3.connect(':memory:')
        conn.execute('''
            CREATE TABLE fact_orders (
                order_id TEXT PRIMARY KEY,
                customer_id TEXT,
                total REAL,
                order_month TEXT,
                _record_hash TEXT
            )
        ''')
        conn.commit()
 
        yield conn
        conn.close()
 
    def test_idempotent_upsert(self, target_database):
        """Test that upserts are idempotent."""
        from pipeline.loading import IdempotentWriter
 
        writer = IdempotentWriter('fact_orders', [])
 
        records = [
            {'order_id': 'ord-1', 'customer_id': 'cust-1', 'total': 100.0, 'order_month': '2024-01'}
        ]
 
        # First write
        count1 = writer.write_with_deduplication(
            records, ['order_id'], target_database
        )
 
        # Second write with same data
        count2 = writer.write_with_deduplication(
            records, ['order_id'], target_database
        )
 
        # Verify only one record exists
        cursor = target_database.cursor()
        cursor.execute("SELECT COUNT(*) FROM fact_orders")
        total = cursor.fetchone()[0]
 
        assert total == 1
 
 
# ============================================================================
# Data Quality Tests: Validate data meets expectations
# ============================================================================
 
class TestDataQuality:
    """Data quality validation tests."""
 
    @pytest.fixture
    def sample_data(self) -> List[Dict]:
        return [
            {'id': '1', 'email': '[email protected]', 'amount': 100.0, 'status': 'active'},
            {'id': '2', 'email': '[email protected]', 'amount': 200.0, 'status': 'active'},
            {'id': '3', 'email': '[email protected]', 'amount': 50.0, 'status': 'pending'},
        ]
 
    def test_completeness(self, sample_data):
        """Test that required fields are present."""
        from pipeline.quality import DataQualityChecker
 
        checker = DataQualityChecker()
 
        result = checker.check_not_null(sample_data, 'email')
        assert result.passed == True
 
        # Add record with null email
        data_with_null = sample_data + [{'id': '4', 'email': None, 'amount': 100.0}]
        result = checker.check_not_null(data_with_null, 'email')
        assert result.passed == False
        assert result.failure_count == 1
 
    def test_uniqueness(self, sample_data):
        """Test that unique constraints are met."""
        from pipeline.quality import DataQualityChecker
 
        checker = DataQualityChecker()
 
        result = checker.check_unique(sample_data, 'id')
        assert result.passed == True
 
        # Add duplicate
        data_with_dup = sample_data + [{'id': '1', 'email': '[email protected]'}]
        result = checker.check_unique(data_with_dup, 'id')
        assert result.passed == False
 
    def test_valid_values(self, sample_data):
        """Test that values are within expected sets."""
        from pipeline.quality import DataQualityChecker
 
        checker = DataQualityChecker()
 
        valid_statuses = {'active', 'pending', 'cancelled'}
        result = checker.check_in_set(sample_data, 'status', valid_statuses)
        assert result.passed == True
 
        # Add invalid status
        data_with_invalid = sample_data + [{'id': '4', 'status': 'unknown'}]
        result = checker.check_in_set(data_with_invalid, 'status', valid_statuses)
        assert result.passed == False
 
    def test_statistical_outliers(self, sample_data):
        """Test for statistical outliers in numeric columns."""
        from pipeline.quality import DataQualityChecker
 
        checker = DataQualityChecker()
 
        # Normal data should pass
        result = checker.check_no_outliers(sample_data, 'amount', z_threshold=3)
        assert result.passed == True
 
        # Add extreme outlier
        data_with_outlier = sample_data + [{'id': '4', 'amount': 1000000.0}]
        result = checker.check_no_outliers(data_with_outlier, 'amount', z_threshold=3)
        assert result.passed == False
 
 
# ============================================================================
# Contract Tests: Test interfaces between components
# ============================================================================
 
class TestSchemaContracts:
    """Tests that verify schema contracts between pipeline stages."""
 
    def test_extraction_output_schema(self):
        """Verify extraction produces expected schema."""
        from pipeline.extraction import extract_orders
        from pipeline.schemas import EXTRACTED_ORDER_SCHEMA
 
        # Run extraction
        records = extract_orders(limit=10)
 
        # Validate schema
        for record in records:
            for field, field_type in EXTRACTED_ORDER_SCHEMA.items():
                assert field in record, f"Missing field: {field}"
                if record[field] is not None:
                    assert isinstance(record[field], field_type), \
                        f"Field {field} has wrong type"
 
    def test_transform_output_schema(self):
        """Verify transformation produces expected schema."""
        from pipeline.transforms import transform_order
        from pipeline.schemas import TRANSFORMED_ORDER_SCHEMA
 
        input_record = {
            'order_id': '123',
            'customer_id': 'cust-1',
            'created_at': datetime(2024, 1, 15),
            'total': '100.00',
            'currency': 'usd'
        }
 
        output = transform_order(input_record)
 
        for field, field_type in TRANSFORMED_ORDER_SCHEMA.items():
            assert field in output, f"Missing field: {field}"

Key Takeaways

Building production-grade ETL pipelines requires attention to several critical areas:

  1. Idempotency is non-negotiable: Every operation must produce the same result when run multiple times. Use partition swaps, merge operations, or write-ahead logs.

  2. Design for failure: Assume every external system will fail. Implement retries with exponential backoff, dead letter queues for failed records, and comprehensive error classification.

  3. Observability from day one: Instrument pipelines with metrics, structured logging, and data lineage tracking. You can't debug or improve what you can't measure.

  4. Test at every level: Unit tests for transformation logic, integration tests for component interactions, and data quality tests for validation. Contract tests ensure stages remain compatible.

  5. Incremental over full refresh: The complexity investment in incremental processing pays dividends in processing time, cost, and system load.

  6. Checkpointing enables recovery: Persist extraction state, processing progress, and validation results. Recovery should be automatic after any failure.

The patterns in this guide represent years of collective learning from production failures. Apply them early, and your pipelines will serve you reliably for years to come.

References

  1. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  2. Reis, J., & Housley, M. (2022). Fundamentals of Data Engineering. O'Reilly Media.
  3. Apache Airflow Documentation: Best Practices for DAGs
  4. dbt Labs: Data Testing Best Practices
  5. OpenLineage Specification: https://openlineage.io/
  6. Great Expectations Documentation: Data Quality Framework

Key Takeaways

  • Idempotency is non-negotiable: every operation must produce the same result when run multiple times
  • Design for failure: assume every external system will eventually fail, and build recovery mechanisms accordingly
  • Observability drives reliability: instrument pipelines with metrics, logs, and lineage from day one
  • Testing is not optional: unit tests, integration tests, and data quality tests are all essential
  • Incremental processing is almost always worth the complexity investment over full refreshes
Gemut Analytics Team
Gemut Analytics Team
Data Engineering Experts