Building Production-Grade ETL Pipelines: A Practitioner's Guide
Comprehensive guide to designing, implementing, and operating ETL pipelines that handle real-world complexity—covering idempotency, error handling, monitoring, testing, and operational excellence
Most ETL tutorials focus on the happy path—reading data, transforming it, and writing output. But production systems must handle failures, ensure exactly-once semantics, support incremental processing, and provide observability. This guide covers the battle-tested patterns that distinguish reliable data pipelines from fragile ones.
- Basic understanding of Python programming
- Familiarity with SQL and relational databases
- Experience with at least one ETL tool (Airflow, dbt, etc.)

Building Production-Grade ETL Pipelines
Introduction
Every data engineer has experienced the sinking feeling: a pipeline that worked perfectly in development fails mysteriously in production. Data is missing, duplicated, or corrupted. Stakeholders are upset. The root cause is buried in logs across three different systems.
The gap between "it works on my machine" and "it runs reliably in production" is vast. This guide bridges that gap by covering the patterns, practices, and principles that transform fragile ETL scripts into robust production systems.
The Anatomy of a Production Pipeline
Before diving into specific techniques, let's establish what distinguishes production pipelines from development scripts:
┌────────────────────────────────────────────────────────────────────────────┐
│ PRODUCTION PIPELINE REQUIREMENTS │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ RELIABILITY OBSERVABILITY │
│ ─────────── ───────────── │
│ • Handles partial failures • Metrics for every stage │
│ • Retries with backoff • Structured logging │
│ • Exactly-once semantics • Data lineage tracking │
│ • Graceful degradation • Alerting on anomalies │
│ │
│ MAINTAINABILITY PERFORMANCE │
│ ─────────────── ─────────── │
│ • Clear separation of concerns • Incremental processing │
│ • Comprehensive test coverage • Parallelization │
│ • Version-controlled configuration • Resource optimization │
│ • Self-documenting code • Backpressure handling │
│ │
│ SECURITY GOVERNANCE │
│ ──────── ────────── │
│ • Secrets management • Schema evolution │
│ • Data encryption • Data quality validation │
│ • Access control • Audit trails │
│ • PII handling • Compliance enforcement │
│ │
└────────────────────────────────────────────────────────────────────────────┘Idempotency: The Foundation
Idempotency means that running an operation multiple times produces the same result as running it once. This property is fundamental because in distributed systems, operations may be retried due to timeouts, network partitions, or crashes.
Why Idempotency Matters
Consider a pipeline that inserts records into a database. If it fails after inserting half the records and is retried, without idempotency you'll get duplicate data. With idempotency, the retry produces the same final state as if it had succeeded the first time.
Implementing Idempotent Writes
# idempotent_writes.py
"""
Patterns for implementing idempotent data writes.
These patterns ensure that retry operations don't corrupt data.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime, date
from dataclasses import dataclass
import hashlib
import json
@dataclass
class ProcessingCheckpoint:
"""Tracks pipeline execution state for recovery."""
pipeline_id: str
run_id: str
partition_key: str # e.g., "2024-01-15" for daily partitions
stage: str
status: str # 'pending', 'in_progress', 'completed', 'failed'
records_processed: int
started_at: datetime
completed_at: Optional[datetime]
error_message: Optional[str]
class IdempotentWriter:
"""
Base class for idempotent data writes.
Subclasses implement specific storage backends.
"""
def __init__(self, target_table: str, partition_columns: List[str]):
self.target_table = target_table
self.partition_columns = partition_columns
def compute_record_hash(self, record: Dict[str, Any], key_columns: List[str]) -> str:
"""
Compute deterministic hash for deduplication.
Uses only business key columns, not system columns like timestamps.
"""
key_values = {k: record[k] for k in sorted(key_columns)}
key_string = json.dumps(key_values, sort_keys=True, default=str)
return hashlib.sha256(key_string.encode()).hexdigest()
def write_with_deduplication(
self,
records: List[Dict[str, Any]],
key_columns: List[str],
connection
) -> int:
"""
Write records with hash-based deduplication.
Existing records with matching hashes are skipped.
Pattern: MERGE/UPSERT with hash comparison
"""
# Add hash to each record
for record in records:
record['_record_hash'] = self.compute_record_hash(record, key_columns)
# Use database-specific MERGE operation
merge_sql = f"""
MERGE INTO {self.target_table} AS target
USING (SELECT * FROM staging_table) AS source
ON target._record_hash = source._record_hash
WHEN NOT MATCHED THEN
INSERT (*)
WHEN MATCHED THEN
UPDATE SET * -- Optional: update if needed
"""
cursor = connection.cursor()
cursor.execute(merge_sql)
affected_rows = cursor.rowcount
return affected_rows
class PartitionSwapWriter(IdempotentWriter):
"""
Idempotent writes using partition swap pattern.
Atomically replaces entire partitions—no partial updates possible.
This is the gold standard for idempotency in batch pipelines.
"""
def write_partition(
self,
records: List[Dict[str, Any]],
partition_value: str,
connection
) -> int:
"""
Atomically replace a partition with new data.
Steps:
1. Write to staging partition
2. Validate staging data
3. Atomic swap: drop old partition, rename staging
"""
staging_partition = f"{self.target_table}__staging__{partition_value}"
try:
# Step 1: Write to staging
self._write_to_staging(records, staging_partition, connection)
# Step 2: Validate (row counts, checksums, etc.)
self._validate_staging(staging_partition, len(records), connection)
# Step 3: Atomic swap
self._atomic_swap(partition_value, staging_partition, connection)
return len(records)
except Exception as e:
# Clean up staging on failure
self._cleanup_staging(staging_partition, connection)
raise
def _atomic_swap(self, partition_value: str, staging: str, connection):
"""
Perform atomic partition swap.
Implementation varies by database:
- Snowflake: ALTER TABLE ... SWAP PARTITION
- BigQuery: bq cp with --replace
- Spark: Overwrite specific partition
"""
# Snowflake example
swap_sql = f"""
ALTER TABLE {self.target_table}
SWAP PARTITION {partition_value}
WITH {staging}
"""
connection.execute(swap_sql)
class WriteAheadLogWriter(IdempotentWriter):
"""
Idempotent writes using write-ahead log (WAL) pattern.
Records all intended writes before execution.
Enables recovery from any failure point.
"""
def __init__(self, target_table: str, wal_table: str, partition_columns: List[str]):
super().__init__(target_table, partition_columns)
self.wal_table = wal_table
def write_with_wal(
self,
records: List[Dict[str, Any]],
batch_id: str,
connection
) -> int:
"""
Write records using WAL for crash recovery.
Pattern:
1. Write batch metadata to WAL
2. Write records to staging
3. Move to target
4. Mark WAL entry as complete
"""
# Step 1: Record intent in WAL
self._write_wal_entry(batch_id, 'STARTED', len(records), connection)
try:
# Step 2: Write to staging
staging_count = self._write_staging(records, batch_id, connection)
self._write_wal_entry(batch_id, 'STAGED', staging_count, connection)
# Step 3: Move to target (using MERGE for idempotency)
target_count = self._merge_to_target(batch_id, connection)
# Step 4: Mark complete
self._write_wal_entry(batch_id, 'COMPLETED', target_count, connection)
return target_count
except Exception as e:
self._write_wal_entry(batch_id, 'FAILED', 0, connection, str(e))
raise
def recover_incomplete_batches(self, connection) -> List[str]:
"""
Find and recover batches that started but didn't complete.
Called during pipeline startup.
"""
incomplete_sql = f"""
SELECT batch_id, status, record_count
FROM {self.wal_table}
WHERE status NOT IN ('COMPLETED', 'ABANDONED')
ORDER BY created_at
"""
incomplete = connection.execute(incomplete_sql).fetchall()
recovered = []
for batch_id, status, record_count in incomplete:
if status == 'STAGED':
# Resume from staging
self._merge_to_target(batch_id, connection)
self._write_wal_entry(batch_id, 'COMPLETED', record_count, connection)
recovered.append(batch_id)
elif status == 'STARTED':
# Need to re-extract and reprocess
# Mark as abandoned, will be picked up by normal processing
self._write_wal_entry(batch_id, 'ABANDONED', 0, connection)
return recoveredIncremental Processing
Full refreshes are simple but don't scale. Incremental processing loads only new or changed data, dramatically reducing processing time and cost.
Incremental Extraction Patterns
# incremental_extraction.py
"""
Patterns for incremental data extraction.
Each pattern handles different source system characteristics.
"""
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
import hashlib
@dataclass
class ExtractionState:
"""Tracks extraction progress for resumability."""
source_table: str
last_extracted_at: datetime
high_watermark: Any # Could be timestamp, ID, or version
watermark_column: str
rows_extracted: int
checksum: Optional[str]
class IncrementalExtractor(ABC):
"""Base class for incremental extraction strategies."""
@abstractmethod
def get_incremental_predicate(self, state: ExtractionState) -> str:
"""Return SQL predicate for incremental extraction."""
pass
@abstractmethod
def update_state(self, state: ExtractionState, extracted_data: List[Dict]) -> ExtractionState:
"""Update extraction state after successful extraction."""
pass
class TimestampBasedExtractor(IncrementalExtractor):
"""
Extract records based on modification timestamp.
Requirements:
- Source has reliable updated_at column
- Updates always modify timestamp
- Clock skew is bounded
Considerations:
- Add overlap window to handle clock skew
- May extract same records multiple times (use dedup on load)
"""
def __init__(
self,
timestamp_column: str = 'updated_at',
overlap_seconds: int = 300 # 5-minute overlap for clock skew
):
self.timestamp_column = timestamp_column
self.overlap_seconds = overlap_seconds
def get_incremental_predicate(self, state: ExtractionState) -> str:
# Subtract overlap to handle clock skew
safe_watermark = state.high_watermark - timedelta(seconds=self.overlap_seconds)
return f"{self.timestamp_column} >= '{safe_watermark.isoformat()}'"
def update_state(
self,
state: ExtractionState,
extracted_data: List[Dict]
) -> ExtractionState:
if not extracted_data:
return state
# Find new high watermark
max_timestamp = max(row[self.timestamp_column] for row in extracted_data)
return ExtractionState(
source_table=state.source_table,
last_extracted_at=datetime.utcnow(),
high_watermark=max_timestamp,
watermark_column=self.timestamp_column,
rows_extracted=state.rows_extracted + len(extracted_data),
checksum=None
)
class CDCExtractor(IncrementalExtractor):
"""
Extract using Change Data Capture (CDC).
Captures inserts, updates, and deletes from transaction log.
Gold standard for incremental extraction when available.
Sources:
- Debezium (Kafka Connect)
- AWS DMS
- Fivetran / Airbyte
- Native CDC (SQL Server, Oracle GoldenGate)
"""
def __init__(self, cdc_table: str, lsn_column: str = '_cdc_lsn'):
self.cdc_table = cdc_table
self.lsn_column = lsn_column
def get_incremental_predicate(self, state: ExtractionState) -> str:
return f"{self.lsn_column} > '{state.high_watermark}'"
def process_cdc_records(
self,
cdc_records: List[Dict]
) -> Tuple[List[Dict], List[str]]:
"""
Process CDC records into upserts and deletes.
Returns:
Tuple of (records_to_upsert, keys_to_delete)
"""
upserts = []
deletes = []
for record in cdc_records:
operation = record.get('_cdc_operation', 'u')
if operation == 'd':
# Delete operation
deletes.append(record['_pk'])
else:
# Insert or Update
# Remove CDC metadata before upserting
clean_record = {
k: v for k, v in record.items()
if not k.startswith('_cdc_')
}
upserts.append(clean_record)
return upserts, deletes
class SnapshotDiffExtractor(IncrementalExtractor):
"""
Extract by comparing full snapshots.
Use when:
- Source has no reliable timestamp
- Source has no CDC capability
- Need to detect deletes
Approach:
- Take full snapshot hash
- Compare with previous snapshot
- Extract only changed rows
"""
def __init__(self, primary_key_columns: List[str], comparison_columns: List[str]):
self.pk_columns = primary_key_columns
self.comparison_columns = comparison_columns
def compute_row_signature(self, row: Dict[str, Any]) -> str:
"""Compute hash of row for comparison."""
values = [str(row.get(col, '')) for col in self.comparison_columns]
return hashlib.md5('|'.join(values).encode()).hexdigest()
def diff_snapshots(
self,
current_snapshot: List[Dict],
previous_signatures: Dict[str, str]
) -> Tuple[List[Dict], List[Dict], List[str]]:
"""
Compare snapshots to find changes.
Returns:
Tuple of (inserts, updates, deleted_keys)
"""
current_by_key = {}
current_signatures = {}
for row in current_snapshot:
key = tuple(row[col] for col in self.pk_columns)
signature = self.compute_row_signature(row)
current_by_key[key] = row
current_signatures[key] = signature
inserts = []
updates = []
deletes = []
# Find inserts and updates
for key, row in current_by_key.items():
key_str = str(key)
current_sig = current_signatures[key]
if key_str not in previous_signatures:
inserts.append(row)
elif previous_signatures[key_str] != current_sig:
updates.append(row)
# Find deletes
current_keys = {str(k) for k in current_by_key.keys()}
for prev_key in previous_signatures.keys():
if prev_key not in current_keys:
deletes.append(prev_key)
return inserts, updates, deletesAirflow DAG for Incremental Pipeline
# dags/incremental_pipeline.py
"""
Production Airflow DAG demonstrating incremental ETL patterns.
Includes checkpointing, error handling, and observability.
"""
from datetime import datetime, timedelta
from typing import Dict, Any
import json
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import Variable
from airflow.exceptions import AirflowException
# Custom modules (would be in separate files)
from plugins.extraction import TimestampBasedExtractor, ExtractionState
from plugins.loading import PartitionSwapWriter
from plugins.monitoring import PipelineMetrics, DataQualityChecker
default_args = {
'owner': 'data-engineering',
'depends_on_past': True, # Important for incremental pipelines
'email_on_failure': True,
'email': ['[email protected]'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='incremental_orders_pipeline',
default_args=default_args,
description='Incremental load of orders data with quality checks',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=True, # Enable backfill for historical data
max_active_runs=1, # Prevent parallel runs that could cause conflicts
tags=['production', 'incremental', 'orders'],
) as dag:
@task(task_id='get_extraction_state')
def get_extraction_state(**context) -> Dict[str, Any]:
"""
Retrieve or initialize extraction state.
State is stored in Airflow Variables for persistence.
"""
state_key = f"extraction_state_{dag.dag_id}"
try:
state_json = Variable.get(state_key)
state = json.loads(state_json)
# For backfills, override with execution date
execution_date = context['data_interval_start']
if context['dag_run'].is_backfill:
state['high_watermark'] = execution_date.isoformat()
except KeyError:
# Initialize state for first run
execution_date = context['data_interval_start']
state = {
'source_table': 'source_db.orders',
'high_watermark': (execution_date - timedelta(days=1)).isoformat(),
'watermark_column': 'updated_at',
'rows_extracted': 0
}
return state
@task(task_id='extract_incremental')
def extract_incremental(state: Dict[str, Any], **context) -> Dict[str, Any]:
"""
Extract records modified since last run.
Pushes extraction metrics to monitoring system.
"""
from plugins.connectors import SourceDatabaseConnector
metrics = PipelineMetrics(context['dag_run'].run_id)
extractor = TimestampBasedExtractor(timestamp_column='updated_at')
# Build and execute extraction query
with SourceDatabaseConnector() as conn:
extraction_state = ExtractionState(**state)
predicate = extractor.get_incremental_predicate(extraction_state)
query = f"""
SELECT *
FROM {state['source_table']}
WHERE {predicate}
AND updated_at < '{context['data_interval_end'].isoformat()}'
ORDER BY updated_at
"""
metrics.start_timer('extraction')
records = conn.execute(query).fetchall()
metrics.stop_timer('extraction')
metrics.record_count('extracted_rows', len(records))
# Store extracted data for next task
# In production, use XCom backed by external storage for large datasets
return {
'records': records,
'count': len(records),
'min_watermark': min(r['updated_at'] for r in records).isoformat() if records else None,
'max_watermark': max(r['updated_at'] for r in records).isoformat() if records else None,
}
@task(task_id='transform')
def transform(extraction_result: Dict[str, Any], **context) -> Dict[str, Any]:
"""
Apply business transformations.
Validates and enriches extracted records.
"""
records = extraction_result['records']
if not records:
return {'records': [], 'count': 0}
transformed = []
errors = []
for record in records:
try:
# Apply transformations
transformed_record = {
'order_id': record['order_id'],
'customer_id': record['customer_id'],
'order_date': record['created_at'].date(),
'total_amount': float(record['total']),
'currency': record.get('currency', 'USD').upper(),
'status': record['status'].lower(),
'item_count': record.get('item_count', 0),
# Add derived fields
'order_month': record['created_at'].strftime('%Y-%m'),
'is_high_value': float(record['total']) > 1000,
# Audit fields
'_extracted_at': context['data_interval_end'].isoformat(),
'_pipeline_run_id': context['dag_run'].run_id,
}
transformed.append(transformed_record)
except Exception as e:
errors.append({
'record_id': record.get('order_id'),
'error': str(e)
})
if errors:
# Log errors but continue - don't fail entire batch for bad records
context['ti'].xcom_push(key='transform_errors', value=errors)
return {'records': transformed, 'count': len(transformed)}
@task(task_id='validate_quality')
def validate_quality(transform_result: Dict[str, Any], **context) -> bool:
"""
Run data quality checks before loading.
Fails pipeline if critical checks don't pass.
"""
records = transform_result['records']
if not records:
return True # Nothing to validate
checker = DataQualityChecker()
# Define quality expectations
checks = [
# Completeness checks
checker.check_not_null(records, 'order_id'),
checker.check_not_null(records, 'customer_id'),
checker.check_not_null(records, 'total_amount'),
# Validity checks
checker.check_positive(records, 'total_amount'),
checker.check_in_set(records, 'status',
{'pending', 'completed', 'cancelled', 'refunded'}),
# Uniqueness check
checker.check_unique(records, 'order_id'),
# Referential check (cached lookup)
checker.check_foreign_key(records, 'customer_id',
reference_table='dim_customers'),
]
failed_checks = [c for c in checks if not c.passed]
if failed_checks:
critical_failures = [c for c in failed_checks if c.severity == 'critical']
if critical_failures:
raise AirflowException(
f"Critical data quality failures: {critical_failures}"
)
# Log warnings for non-critical failures
for check in failed_checks:
context['ti'].log.warning(f"Quality check warning: {check}")
return True
@task(task_id='load_to_warehouse')
def load_to_warehouse(
transform_result: Dict[str, Any],
quality_passed: bool,
**context
) -> Dict[str, Any]:
"""
Idempotent load to data warehouse.
Uses partition swap for atomic updates.
"""
if not quality_passed:
raise AirflowException("Quality validation did not pass")
records = transform_result['records']
if not records:
return {'loaded_count': 0}
writer = PartitionSwapWriter(
target_table='warehouse.fact_orders',
partition_columns=['order_month']
)
from plugins.connectors import WarehouseConnector
metrics = PipelineMetrics(context['dag_run'].run_id)
with WarehouseConnector() as conn:
metrics.start_timer('loading')
# Group records by partition
partitions = {}
for record in records:
partition = record['order_month']
partitions.setdefault(partition, []).append(record)
total_loaded = 0
for partition_value, partition_records in partitions.items():
loaded = writer.write_partition(
records=partition_records,
partition_value=partition_value,
connection=conn
)
total_loaded += loaded
metrics.stop_timer('loading')
metrics.record_count('loaded_rows', total_loaded)
return {'loaded_count': total_loaded}
@task(task_id='update_extraction_state')
def update_extraction_state(
extraction_result: Dict[str, Any],
load_result: Dict[str, Any],
**context
):
"""
Persist extraction state for next run.
Only called after successful load.
"""
if extraction_result['max_watermark']:
state_key = f"extraction_state_{dag.dag_id}"
current_state = json.loads(Variable.get(state_key, '{}'))
current_state['high_watermark'] = extraction_result['max_watermark']
current_state['rows_extracted'] = current_state.get('rows_extracted', 0) + extraction_result['count']
current_state['last_run'] = context['data_interval_end'].isoformat()
Variable.set(state_key, json.dumps(current_state))
# Define task dependencies
state = get_extraction_state()
extraction = extract_incremental(state)
transformed = transform(extraction)
validated = validate_quality(transformed)
loaded = load_to_warehouse(transformed, validated)
update_extraction_state(extraction, loaded)Error Handling and Recovery
Errors are inevitable. The question is not whether they'll occur, but how gracefully your pipeline handles them.
Error Handling Hierarchy
# error_handling.py
"""
Comprehensive error handling patterns for production pipelines.
Implements retry strategies, dead letter queues, and recovery mechanisms.
"""
from enum import Enum
from typing import Callable, TypeVar, Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
import time
import traceback
import functools
T = TypeVar('T')
class ErrorSeverity(Enum):
"""Classification of error severity for handling decisions."""
TRANSIENT = 'transient' # Retry will likely succeed
RECOVERABLE = 'recoverable' # Can continue with partial data
FATAL = 'fatal' # Must stop processing
class RetryPolicy(Enum):
"""Standard retry policies."""
NONE = 'none'
LINEAR = 'linear'
EXPONENTIAL = 'exponential'
EXPONENTIAL_JITTER = 'exponential_jitter'
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_attempts: int = 3
base_delay_seconds: float = 1.0
max_delay_seconds: float = 300.0
policy: RetryPolicy = RetryPolicy.EXPONENTIAL_JITTER
retryable_exceptions: tuple = (ConnectionError, TimeoutError)
def with_retry(config: RetryConfig):
"""
Decorator for retrying operations with configurable backoff.
Usage:
@with_retry(RetryConfig(max_attempts=5))
def call_external_api():
...
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(1, config.max_attempts + 1):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_attempts:
break
delay = _calculate_delay(attempt, config)
# Log retry attempt
print(f"Attempt {attempt} failed: {e}. Retrying in {delay:.2f}s")
time.sleep(delay)
except Exception as e:
# Non-retryable exception
raise
raise last_exception
return wrapper
return decorator
def _calculate_delay(attempt: int, config: RetryConfig) -> float:
"""Calculate delay based on retry policy."""
import random
if config.policy == RetryPolicy.NONE:
return 0
elif config.policy == RetryPolicy.LINEAR:
delay = config.base_delay_seconds * attempt
elif config.policy == RetryPolicy.EXPONENTIAL:
delay = config.base_delay_seconds * (2 ** (attempt - 1))
elif config.policy == RetryPolicy.EXPONENTIAL_JITTER:
base_delay = config.base_delay_seconds * (2 ** (attempt - 1))
delay = base_delay * (0.5 + random.random()) # 50-150% of base
else:
delay = config.base_delay_seconds
return min(delay, config.max_delay_seconds)
@dataclass
class ProcessingError:
"""Structured representation of a processing error."""
record_id: str
error_type: str
error_message: str
severity: ErrorSeverity
stack_trace: str
occurred_at: datetime = field(default_factory=datetime.utcnow)
context: Dict[str, Any] = field(default_factory=dict)
class DeadLetterQueue:
"""
Handles records that fail processing.
Enables investigation and reprocessing without blocking pipeline.
"""
def __init__(self, queue_table: str, connection):
self.queue_table = queue_table
self.connection = connection
def send(self, record: Dict[str, Any], error: ProcessingError):
"""Send failed record to DLQ for later investigation."""
dlq_entry = {
'record_id': error.record_id,
'original_record': record,
'error_type': error.error_type,
'error_message': error.error_message,
'severity': error.severity.value,
'stack_trace': error.stack_trace,
'context': error.context,
'occurred_at': error.occurred_at,
'status': 'pending',
'retry_count': 0
}
self.connection.execute(
f"INSERT INTO {self.queue_table} VALUES (?)",
(dlq_entry,)
)
def get_pending(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve pending records for reprocessing."""
return self.connection.execute(
f"""
SELECT * FROM {self.queue_table}
WHERE status = 'pending'
AND retry_count < 3
ORDER BY occurred_at
LIMIT {limit}
"""
).fetchall()
def mark_resolved(self, record_id: str, resolution: str):
"""Mark DLQ entry as resolved."""
self.connection.execute(
f"""
UPDATE {self.queue_table}
SET status = 'resolved',
resolution = ?,
resolved_at = CURRENT_TIMESTAMP
WHERE record_id = ?
""",
(resolution, record_id)
)
class BatchProcessor:
"""
Processes records in batches with error isolation.
Failed records go to DLQ; successful records continue.
"""
def __init__(
self,
process_func: Callable[[Dict], Dict],
dlq: DeadLetterQueue,
error_threshold_pct: float = 10.0 # Fail batch if >10% errors
):
self.process_func = process_func
self.dlq = dlq
self.error_threshold_pct = error_threshold_pct
def process_batch(
self,
records: List[Dict[str, Any]],
context: Dict[str, Any]
) -> tuple[List[Dict], List[ProcessingError]]:
"""
Process batch with error isolation.
Returns:
Tuple of (successful_records, errors)
"""
successful = []
errors = []
for record in records:
try:
result = self.process_func(record)
successful.append(result)
except Exception as e:
error = ProcessingError(
record_id=str(record.get('id', 'unknown')),
error_type=type(e).__name__,
error_message=str(e),
severity=self._classify_error(e),
stack_trace=traceback.format_exc(),
context=context
)
errors.append(error)
# Send to DLQ for later investigation
self.dlq.send(record, error)
# Check error threshold
error_pct = (len(errors) / len(records)) * 100 if records else 0
if error_pct > self.error_threshold_pct:
raise Exception(
f"Batch error rate {error_pct:.1f}% exceeds threshold "
f"{self.error_threshold_pct}%"
)
return successful, errors
def _classify_error(self, exception: Exception) -> ErrorSeverity:
"""Classify error severity based on exception type."""
transient_errors = (
ConnectionError, TimeoutError,
# Add database-specific transient errors
)
if isinstance(exception, transient_errors):
return ErrorSeverity.TRANSIENT
elif isinstance(exception, (ValueError, KeyError)):
return ErrorSeverity.RECOVERABLE
else:
return ErrorSeverity.FATALObservability
You can't improve what you can't measure. Production pipelines require comprehensive observability.
Metrics, Logs, and Lineage
# observability.py
"""
Observability infrastructure for production ETL pipelines.
Covers metrics, structured logging, and data lineage.
"""
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import time
from contextlib import contextmanager
class MetricType(Enum):
COUNTER = 'counter'
GAUGE = 'gauge'
HISTOGRAM = 'histogram'
TIMER = 'timer'
@dataclass
class PipelineMetrics:
"""
Collects and exports pipeline metrics.
Compatible with Prometheus, DataDog, CloudWatch.
"""
run_id: str
pipeline_name: str = ''
metrics: Dict[str, Any] = field(default_factory=dict)
timers: Dict[str, float] = field(default_factory=dict)
_timer_starts: Dict[str, float] = field(default_factory=dict)
def record_count(self, name: str, value: int, tags: Dict[str, str] = None):
"""Record a count metric."""
key = self._metric_key(name, tags)
self.metrics[key] = {
'type': MetricType.COUNTER.value,
'value': value,
'tags': tags or {},
'timestamp': datetime.utcnow().isoformat()
}
def record_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""Record a gauge (point-in-time) metric."""
key = self._metric_key(name, tags)
self.metrics[key] = {
'type': MetricType.GAUGE.value,
'value': value,
'tags': tags or {},
'timestamp': datetime.utcnow().isoformat()
}
def start_timer(self, name: str):
"""Start a timer for duration tracking."""
self._timer_starts[name] = time.time()
def stop_timer(self, name: str) -> float:
"""Stop timer and record duration."""
if name not in self._timer_starts:
raise ValueError(f"Timer '{name}' was never started")
duration = time.time() - self._timer_starts[name]
self.timers[name] = duration
self.metrics[f"timer.{name}"] = {
'type': MetricType.TIMER.value,
'value': duration,
'timestamp': datetime.utcnow().isoformat()
}
del self._timer_starts[name]
return duration
@contextmanager
def timed(self, name: str):
"""Context manager for timing code blocks."""
self.start_timer(name)
try:
yield
finally:
self.stop_timer(name)
def _metric_key(self, name: str, tags: Dict[str, str] = None) -> str:
if not tags:
return name
tag_str = ','.join(f"{k}={v}" for k, v in sorted(tags.items()))
return f"{name}[{tag_str}]"
def export_prometheus(self) -> str:
"""Export metrics in Prometheus exposition format."""
lines = []
for key, metric in self.metrics.items():
name = key.split('[')[0].replace('.', '_')
tags = metric.get('tags', {})
tag_str = ','.join(f'{k}="{v}"' for k, v in tags.items())
tag_part = f'{{{tag_str}}}' if tag_str else ''
lines.append(f'{name}{tag_part} {metric["value"]}')
return '\n'.join(lines)
def export_json(self) -> str:
"""Export metrics as JSON for CloudWatch/DataDog."""
return json.dumps({
'run_id': self.run_id,
'pipeline': self.pipeline_name,
'metrics': self.metrics,
'timers': self.timers,
'exported_at': datetime.utcnow().isoformat()
})
class StructuredLogger:
"""
Structured logging for pipeline observability.
All log entries are JSON for easy parsing and querying.
"""
def __init__(self, pipeline_name: str, run_id: str):
self.pipeline_name = pipeline_name
self.run_id = run_id
self.base_context = {
'pipeline': pipeline_name,
'run_id': run_id
}
def _log(self, level: str, message: str, **kwargs):
entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'message': message,
**self.base_context,
**kwargs
}
# In production, send to logging infrastructure
print(json.dumps(entry))
def info(self, message: str, **kwargs):
self._log('INFO', message, **kwargs)
def warning(self, message: str, **kwargs):
self._log('WARNING', message, **kwargs)
def error(self, message: str, exception: Exception = None, **kwargs):
if exception:
kwargs['exception_type'] = type(exception).__name__
kwargs['exception_message'] = str(exception)
self._log('ERROR', message, **kwargs)
def stage_start(self, stage_name: str, **kwargs):
"""Log the start of a pipeline stage."""
self.info(f"Stage started: {stage_name}", stage=stage_name, event='stage_start', **kwargs)
def stage_complete(self, stage_name: str, duration_seconds: float, records_processed: int, **kwargs):
"""Log the completion of a pipeline stage."""
self.info(
f"Stage completed: {stage_name}",
stage=stage_name,
event='stage_complete',
duration_seconds=duration_seconds,
records_processed=records_processed,
**kwargs
)
@dataclass
class LineageNode:
"""Represents a dataset in the lineage graph."""
id: str
name: str
type: str # 'source', 'transform', 'destination'
schema: Optional[Dict] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LineageEdge:
"""Represents a transformation between datasets."""
source_id: str
target_id: str
transformation: str
columns_mapped: Dict[str, List[str]] = field(default_factory=dict) # target -> sources
class DataLineageTracker:
"""
Tracks data lineage through the pipeline.
Answers: Where did this data come from? What transformations were applied?
"""
def __init__(self, pipeline_name: str, run_id: str):
self.pipeline_name = pipeline_name
self.run_id = run_id
self.nodes: Dict[str, LineageNode] = {}
self.edges: List[LineageEdge] = []
def register_source(
self,
name: str,
schema: Dict = None,
**metadata
) -> str:
"""Register a source dataset."""
node_id = f"source.{name}"
self.nodes[node_id] = LineageNode(
id=node_id,
name=name,
type='source',
schema=schema,
metadata=metadata
)
return node_id
def register_transform(
self,
name: str,
source_ids: List[str],
column_mappings: Dict[str, List[str]] = None,
**metadata
) -> str:
"""Register a transformation and its lineage."""
node_id = f"transform.{name}"
self.nodes[node_id] = LineageNode(
id=node_id,
name=name,
type='transform',
metadata=metadata
)
for source_id in source_ids:
self.edges.append(LineageEdge(
source_id=source_id,
target_id=node_id,
transformation=name,
columns_mapped=column_mappings or {}
))
return node_id
def register_destination(
self,
name: str,
source_id: str,
schema: Dict = None,
**metadata
) -> str:
"""Register a destination dataset."""
node_id = f"destination.{name}"
self.nodes[node_id] = LineageNode(
id=node_id,
name=name,
type='destination',
schema=schema,
metadata=metadata
)
self.edges.append(LineageEdge(
source_id=source_id,
target_id=node_id,
transformation='load'
))
return node_id
def get_upstream(self, node_id: str) -> List[str]:
"""Get all upstream datasets for a node."""
upstream = []
direct_sources = [e.source_id for e in self.edges if e.target_id == node_id]
for source in direct_sources:
upstream.append(source)
upstream.extend(self.get_upstream(source))
return upstream
def export_openlineage(self) -> Dict:
"""Export lineage in OpenLineage format for integration with tools."""
return {
'producer': f'https://gemut.com/pipelines/{self.pipeline_name}',
'schemaURL': 'https://openlineage.io/spec/1-0-5/OpenLineage.json',
'run': {
'runId': self.run_id,
'facets': {
'pipeline': {'name': self.pipeline_name}
}
},
'inputs': [
{'namespace': 'production', 'name': n.name}
for n in self.nodes.values() if n.type == 'source'
],
'outputs': [
{'namespace': 'production', 'name': n.name}
for n in self.nodes.values() if n.type == 'destination'
]
}Testing Strategies
Untested pipelines are unreliable pipelines. Testing data pipelines requires multiple strategies.
Unit, Integration, and Data Quality Tests
# tests/test_pipeline.py
"""
Comprehensive testing patterns for ETL pipelines.
Covers unit tests, integration tests, and data quality tests.
"""
import pytest
from unittest.mock import Mock, patch
from datetime import datetime, date
from typing import List, Dict
# ============================================================================
# Unit Tests: Test individual functions in isolation
# ============================================================================
class TestTransformations:
"""Unit tests for transformation logic."""
def test_currency_normalization(self):
"""Test that currencies are properly normalized."""
from pipeline.transforms import normalize_currency
assert normalize_currency({'total': 100, 'currency': 'usd'}) == {
'total': 100, 'currency': 'USD'
}
assert normalize_currency({'total': 100, 'currency': None}) == {
'total': 100, 'currency': 'USD' # Default
}
def test_derived_fields(self):
"""Test derived field calculations."""
from pipeline.transforms import add_derived_fields
record = {
'order_id': '123',
'created_at': datetime(2024, 1, 15, 10, 30),
'total': 1500.00
}
result = add_derived_fields(record)
assert result['order_month'] == '2024-01'
assert result['is_high_value'] == True
assert result['order_hour'] == 10
def test_null_handling(self):
"""Test that null values are handled correctly."""
from pipeline.transforms import clean_record
record = {
'id': '123',
'name': None,
'email': '',
'value': 0
}
result = clean_record(record)
assert result['name'] is None # Preserved
assert result['email'] is None # Empty string -> None
assert result['value'] == 0 # Zero preserved
def test_type_coercion(self):
"""Test type coercion for various input formats."""
from pipeline.transforms import coerce_types
record = {
'amount': '123.45',
'count': '100',
'date': '2024-01-15',
'flag': 'true'
}
schema = {
'amount': float,
'count': int,
'date': date,
'flag': bool
}
result = coerce_types(record, schema)
assert result['amount'] == 123.45
assert result['count'] == 100
assert result['date'] == date(2024, 1, 15)
assert result['flag'] == True
class TestIdempotency:
"""Tests for idempotency guarantees."""
def test_record_hash_deterministic(self):
"""Record hash should be deterministic across runs."""
from pipeline.loading import IdempotentWriter
writer = IdempotentWriter('test_table', [])
record = {'id': '123', 'name': 'Test', 'value': 100}
hash1 = writer.compute_record_hash(record, ['id', 'name'])
hash2 = writer.compute_record_hash(record, ['id', 'name'])
assert hash1 == hash2
def test_record_hash_ignores_order(self):
"""Hash should be same regardless of dict key order."""
from pipeline.loading import IdempotentWriter
writer = IdempotentWriter('test_table', [])
record1 = {'id': '123', 'name': 'Test'}
record2 = {'name': 'Test', 'id': '123'} # Different order
hash1 = writer.compute_record_hash(record1, ['id', 'name'])
hash2 = writer.compute_record_hash(record2, ['id', 'name'])
assert hash1 == hash2
# ============================================================================
# Integration Tests: Test components working together
# ============================================================================
class TestExtractionIntegration:
"""Integration tests for extraction from real sources."""
@pytest.fixture
def test_database(self):
"""Set up test database with sample data."""
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute('''
CREATE TABLE orders (
order_id TEXT PRIMARY KEY,
customer_id TEXT,
total REAL,
updated_at TIMESTAMP
)
''')
# Insert test data
conn.executemany(
'INSERT INTO orders VALUES (?, ?, ?, ?)',
[
('ord-1', 'cust-1', 100.0, '2024-01-15 10:00:00'),
('ord-2', 'cust-2', 200.0, '2024-01-15 11:00:00'),
('ord-3', 'cust-1', 150.0, '2024-01-15 12:00:00'),
]
)
conn.commit()
yield conn
conn.close()
def test_incremental_extraction(self, test_database):
"""Test that incremental extraction respects watermark."""
from pipeline.extraction import TimestampBasedExtractor, ExtractionState
extractor = TimestampBasedExtractor(
timestamp_column='updated_at',
overlap_seconds=0 # No overlap for testing
)
# Initial state: extract from beginning
state = ExtractionState(
source_table='orders',
last_extracted_at=datetime(2024, 1, 1),
high_watermark=datetime(2024, 1, 15, 10, 30),
watermark_column='updated_at',
rows_extracted=0,
checksum=None
)
predicate = extractor.get_incremental_predicate(state)
cursor = test_database.cursor()
cursor.execute(f"SELECT * FROM orders WHERE {predicate}")
results = cursor.fetchall()
# Should get only records after 10:30
assert len(results) == 2
assert results[0][0] == 'ord-2' # 11:00
assert results[1][0] == 'ord-3' # 12:00
class TestLoadIntegration:
"""Integration tests for data loading."""
@pytest.fixture
def target_database(self):
"""Set up target database."""
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute('''
CREATE TABLE fact_orders (
order_id TEXT PRIMARY KEY,
customer_id TEXT,
total REAL,
order_month TEXT,
_record_hash TEXT
)
''')
conn.commit()
yield conn
conn.close()
def test_idempotent_upsert(self, target_database):
"""Test that upserts are idempotent."""
from pipeline.loading import IdempotentWriter
writer = IdempotentWriter('fact_orders', [])
records = [
{'order_id': 'ord-1', 'customer_id': 'cust-1', 'total': 100.0, 'order_month': '2024-01'}
]
# First write
count1 = writer.write_with_deduplication(
records, ['order_id'], target_database
)
# Second write with same data
count2 = writer.write_with_deduplication(
records, ['order_id'], target_database
)
# Verify only one record exists
cursor = target_database.cursor()
cursor.execute("SELECT COUNT(*) FROM fact_orders")
total = cursor.fetchone()[0]
assert total == 1
# ============================================================================
# Data Quality Tests: Validate data meets expectations
# ============================================================================
class TestDataQuality:
"""Data quality validation tests."""
@pytest.fixture
def sample_data(self) -> List[Dict]:
return [
{'id': '1', 'email': '[email protected]', 'amount': 100.0, 'status': 'active'},
{'id': '2', 'email': '[email protected]', 'amount': 200.0, 'status': 'active'},
{'id': '3', 'email': '[email protected]', 'amount': 50.0, 'status': 'pending'},
]
def test_completeness(self, sample_data):
"""Test that required fields are present."""
from pipeline.quality import DataQualityChecker
checker = DataQualityChecker()
result = checker.check_not_null(sample_data, 'email')
assert result.passed == True
# Add record with null email
data_with_null = sample_data + [{'id': '4', 'email': None, 'amount': 100.0}]
result = checker.check_not_null(data_with_null, 'email')
assert result.passed == False
assert result.failure_count == 1
def test_uniqueness(self, sample_data):
"""Test that unique constraints are met."""
from pipeline.quality import DataQualityChecker
checker = DataQualityChecker()
result = checker.check_unique(sample_data, 'id')
assert result.passed == True
# Add duplicate
data_with_dup = sample_data + [{'id': '1', 'email': '[email protected]'}]
result = checker.check_unique(data_with_dup, 'id')
assert result.passed == False
def test_valid_values(self, sample_data):
"""Test that values are within expected sets."""
from pipeline.quality import DataQualityChecker
checker = DataQualityChecker()
valid_statuses = {'active', 'pending', 'cancelled'}
result = checker.check_in_set(sample_data, 'status', valid_statuses)
assert result.passed == True
# Add invalid status
data_with_invalid = sample_data + [{'id': '4', 'status': 'unknown'}]
result = checker.check_in_set(data_with_invalid, 'status', valid_statuses)
assert result.passed == False
def test_statistical_outliers(self, sample_data):
"""Test for statistical outliers in numeric columns."""
from pipeline.quality import DataQualityChecker
checker = DataQualityChecker()
# Normal data should pass
result = checker.check_no_outliers(sample_data, 'amount', z_threshold=3)
assert result.passed == True
# Add extreme outlier
data_with_outlier = sample_data + [{'id': '4', 'amount': 1000000.0}]
result = checker.check_no_outliers(data_with_outlier, 'amount', z_threshold=3)
assert result.passed == False
# ============================================================================
# Contract Tests: Test interfaces between components
# ============================================================================
class TestSchemaContracts:
"""Tests that verify schema contracts between pipeline stages."""
def test_extraction_output_schema(self):
"""Verify extraction produces expected schema."""
from pipeline.extraction import extract_orders
from pipeline.schemas import EXTRACTED_ORDER_SCHEMA
# Run extraction
records = extract_orders(limit=10)
# Validate schema
for record in records:
for field, field_type in EXTRACTED_ORDER_SCHEMA.items():
assert field in record, f"Missing field: {field}"
if record[field] is not None:
assert isinstance(record[field], field_type), \
f"Field {field} has wrong type"
def test_transform_output_schema(self):
"""Verify transformation produces expected schema."""
from pipeline.transforms import transform_order
from pipeline.schemas import TRANSFORMED_ORDER_SCHEMA
input_record = {
'order_id': '123',
'customer_id': 'cust-1',
'created_at': datetime(2024, 1, 15),
'total': '100.00',
'currency': 'usd'
}
output = transform_order(input_record)
for field, field_type in TRANSFORMED_ORDER_SCHEMA.items():
assert field in output, f"Missing field: {field}"Key Takeaways
Building production-grade ETL pipelines requires attention to several critical areas:
-
Idempotency is non-negotiable: Every operation must produce the same result when run multiple times. Use partition swaps, merge operations, or write-ahead logs.
-
Design for failure: Assume every external system will fail. Implement retries with exponential backoff, dead letter queues for failed records, and comprehensive error classification.
-
Observability from day one: Instrument pipelines with metrics, structured logging, and data lineage tracking. You can't debug or improve what you can't measure.
-
Test at every level: Unit tests for transformation logic, integration tests for component interactions, and data quality tests for validation. Contract tests ensure stages remain compatible.
-
Incremental over full refresh: The complexity investment in incremental processing pays dividends in processing time, cost, and system load.
-
Checkpointing enables recovery: Persist extraction state, processing progress, and validation results. Recovery should be automatic after any failure.
The patterns in this guide represent years of collective learning from production failures. Apply them early, and your pipelines will serve you reliably for years to come.
References
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Reis, J., & Housley, M. (2022). Fundamentals of Data Engineering. O'Reilly Media.
- Apache Airflow Documentation: Best Practices for DAGs
- dbt Labs: Data Testing Best Practices
- OpenLineage Specification: https://openlineage.io/
- Great Expectations Documentation: Data Quality Framework
Key Takeaways
- ✓Idempotency is non-negotiable: every operation must produce the same result when run multiple times
- ✓Design for failure: assume every external system will eventually fail, and build recovery mechanisms accordingly
- ✓Observability drives reliability: instrument pipelines with metrics, logs, and lineage from day one
- ✓Testing is not optional: unit tests, integration tests, and data quality tests are all essential
- ✓Incremental processing is almost always worth the complexity investment over full refreshes



