Skip to main content
|19 min read|Intermediate

Data Quality at Scale: Building a Comprehensive Framework

A practitioner's guide to implementing data quality across the data lifecycle—from prevention and detection to resolution and governance

Data QualityGreat ExpectationsdbtData GovernanceTestingObservability
TL;DR

Data quality issues cost organizations an average of $12.9 million annually. Yet most teams treat quality as an afterthought—adding tests only after problems surface. This guide presents a proactive framework covering the six dimensions of data quality, automated testing with Great Expectations and dbt, anomaly detection, and operational practices for maintaining quality at scale.

Prerequisites
  • Experience with SQL and data warehousing concepts
  • Familiarity with Python programming
  • Basic understanding of data pipelines
Data Quality at Scale: Building a Comprehensive Framework

Data Quality at Scale

The Cost of Poor Data Quality

According to Gartner, poor data quality costs organizations an average of $12.9 million annually. Beyond direct costs, bad data erodes trust: analysts second-guess reports, executives question dashboards, and data initiatives stall as stakeholders lose confidence.

Yet most organizations approach data quality reactively—adding tests only after problems manifest in production. This guide presents a proactive framework that treats quality as a first-class concern throughout the data lifecycle.

The Six Dimensions of Data Quality

Data quality isn't monolithic. The Data Management Association (DAMA) identifies six distinct dimensions, each requiring different detection and remediation strategies:

┌────────────────────────────────────────────────────────────────────────────┐
│                    SIX DIMENSIONS OF DATA QUALITY                          │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  ACCURACY                                 COMPLETENESS                     │
│  ────────                                 ────────────                     │
│  Data correctly represents reality        All required data is present    │
│  • Validation against source              • Null checks                   │
│  • Cross-reference verification           • Missing record detection      │
│  • Sampling and audit                     • Coverage analysis             │
│                                                                            │
│  CONSISTENCY                              TIMELINESS                       │
│  ───────────                              ──────────                       │
│  Data is coherent across systems          Data is current enough          │
│  • Cross-system reconciliation            • Freshness SLAs                │
│  • Referential integrity                  • Latency monitoring            │
│  • Business rule validation               • Staleness alerts              │
│                                                                            │
│  VALIDITY                                 UNIQUENESS                       │
│  ────────                                 ──────────                       │
│  Data conforms to defined formats         No unintended duplicates        │
│  • Schema conformance                     • Primary key validation        │
│  • Range and constraint checks            • Fuzzy duplicate detection     │
│  • Pattern matching                       • Entity resolution             │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Quality Across the Data Lifecycle

Different stages of the data lifecycle require different quality approaches:

Prevention: Quality at Ingestion

The cheapest quality issue is one that never enters your system. Strong ingestion-time validation catches problems at the source.

# ingestion_quality.py
"""
Data quality validation at ingestion time.
Prevents bad data from entering the data lake.
"""

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import re

class ValidationSeverity(Enum):
    """Severity levels for validation failures."""
    WARNING = 'warning'      # Log but allow
    ERROR = 'error'          # Reject record
    CRITICAL = 'critical'    # Halt ingestion


@dataclass
class ValidationResult:
    """Result of a validation check."""
    passed: bool
    check_name: str
    severity: ValidationSeverity
    message: str = ""
    failed_records: List[str] = field(default_factory=list)
    failure_count: int = 0
    details: Dict[str, Any] = field(default_factory=dict)


class SchemaValidator:
    """
    Validates incoming data against expected schema.
    First line of defense against structural issues.
    """

    def __init__(self, schema: Dict[str, Any]):
        """
        Initialize with JSON Schema specification.

        Example schema:
        {
            "type": "object",
            "required": ["id", "timestamp", "value"],
            "properties": {
                "id": {"type": "string", "pattern": "^[A-Z]{2}\\d{6}$"},
                "timestamp": {"type": "string", "format": "date-time"},
                "value": {"type": "number", "minimum": 0},
                "status": {"type": "string", "enum": ["active", "inactive"]}
            }
        }
        """
        self.schema = schema
        self.required_fields = schema.get('required', [])
        self.properties = schema.get('properties', {})

    def validate_record(self, record: Dict[str, Any]) -> List[ValidationResult]:
        """Validate a single record against schema."""
        results = []

        # Check required fields
        for field_name in self.required_fields:
            if field_name not in record or record[field_name] is None:
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"required_field_{field_name}",
                    severity=ValidationSeverity.ERROR,
                    message=f"Missing required field: {field_name}"
                ))

        # Validate each property
        for field_name, field_schema in self.properties.items():
            if field_name not in record:
                continue

            value = record[field_name]
            if value is None and field_name not in self.required_fields:
                continue  # Optional null values are OK

            # Type validation
            expected_type = field_schema.get('type')
            if not self._check_type(value, expected_type):
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"type_{field_name}",
                    severity=ValidationSeverity.ERROR,
                    message=f"Field {field_name} has wrong type. Expected {expected_type}, got {type(value).__name__}"
                ))
                continue

            # Pattern validation
            if 'pattern' in field_schema and isinstance(value, str):
                if not re.match(field_schema['pattern'], value):
                    results.append(ValidationResult(
                        passed=False,
                        check_name=f"pattern_{field_name}",
                        severity=ValidationSeverity.ERROR,
                        message=f"Field {field_name} does not match pattern {field_schema['pattern']}"
                    ))

            # Enum validation
            if 'enum' in field_schema:
                if value not in field_schema['enum']:
                    results.append(ValidationResult(
                        passed=False,
                        check_name=f"enum_{field_name}",
                        severity=ValidationSeverity.ERROR,
                        message=f"Field {field_name} value '{value}' not in allowed values: {field_schema['enum']}"
                    ))

            # Range validation
            if 'minimum' in field_schema and value < field_schema['minimum']:
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"minimum_{field_name}",
                    severity=ValidationSeverity.WARNING,
                    message=f"Field {field_name} value {value} below minimum {field_schema['minimum']}"
                ))

            if 'maximum' in field_schema and value > field_schema['maximum']:
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"maximum_{field_name}",
                    severity=ValidationSeverity.WARNING,
                    message=f"Field {field_name} value {value} above maximum {field_schema['maximum']}"
                ))

        if not results:
            results.append(ValidationResult(
                passed=True,
                check_name="schema_validation",
                severity=ValidationSeverity.WARNING,
                message="All schema validations passed"
            ))

        return results

    def _check_type(self, value: Any, expected_type: str) -> bool:
        """Check if value matches expected JSON Schema type."""
        type_mapping = {
            'string': str,
            'number': (int, float),
            'integer': int,
            'boolean': bool,
            'array': list,
            'object': dict
        }

        if expected_type not in type_mapping:
            return True  # Unknown type, allow

        return isinstance(value, type_mapping[expected_type])


class BusinessRuleValidator:
    """
    Validates data against business rules.
    Catches semantic issues that schema validation misses.
    """

    def __init__(self, rules: List[Dict[str, Any]]):
        """
        Initialize with business rule definitions.

        Example rules:
        [
            {
                "name": "order_total_matches_items",
                "expression": "total == sum(items.price * items.quantity)",
                "severity": "error"
            },
            {
                "name": "future_date_check",
                "expression": "ship_date >= order_date",
                "severity": "error"
            }
        ]
        """
        self.rules = rules

    def validate_record(self, record: Dict[str, Any]) -> List[ValidationResult]:
        """Apply all business rules to a record."""
        results = []

        for rule in self.rules:
            try:
                passed = self._evaluate_rule(record, rule['expression'])
                severity = ValidationSeverity[rule.get('severity', 'warning').upper()]

                results.append(ValidationResult(
                    passed=passed,
                    check_name=rule['name'],
                    severity=severity,
                    message=f"Business rule '{rule['name']}' {'passed' if passed else 'failed'}"
                ))

            except Exception as e:
                results.append(ValidationResult(
                    passed=False,
                    check_name=rule['name'],
                    severity=ValidationSeverity.WARNING,
                    message=f"Could not evaluate rule '{rule['name']}': {str(e)}"
                ))

        return results

    def _evaluate_rule(self, record: Dict[str, Any], expression: str) -> bool:
        """
        Evaluate a rule expression against a record.
        Uses a safe expression evaluator.
        """
        # Simple implementation - in production, use a proper expression parser
        # like simpleeval or ast.literal_eval with safety checks

        local_vars = {**record}

        # Handle common patterns
        if '>=' in expression:
            left, right = expression.split('>=')
            left_val = local_vars.get(left.strip())
            right_val = local_vars.get(right.strip())
            return left_val >= right_val if left_val and right_val else False

        if '==' in expression:
            left, right = expression.split('==')
            left_val = local_vars.get(left.strip())
            right_val = local_vars.get(right.strip())
            return left_val == right_val

        return True  # Unknown expression format


class IngestionQualityGate:
    """
    Quality gate for data ingestion.
    Coordinates validation and decides accept/reject.
    """

    def __init__(
        self,
        schema_validator: SchemaValidator,
        business_validator: BusinessRuleValidator,
        error_threshold_pct: float = 5.0
    ):
        self.schema_validator = schema_validator
        self.business_validator = business_validator
        self.error_threshold_pct = error_threshold_pct

    def process_batch(
        self,
        records: List[Dict[str, Any]]
    ) -> tuple[List[Dict], List[Dict], Dict[str, Any]]:
        """
        Process a batch of records through quality gate.

        Returns:
            Tuple of (accepted_records, rejected_records, summary)
        """
        accepted = []
        rejected = []
        validation_summary = {
            'total_records': len(records),
            'accepted': 0,
            'rejected': 0,
            'warnings': 0,
            'checks_failed': {}
        }

        for record in records:
            # Run all validations
            schema_results = self.schema_validator.validate_record(record)
            business_results = self.business_validator.validate_record(record)

            all_results = schema_results + business_results

            # Classify results
            has_error = any(
                not r.passed and r.severity == ValidationSeverity.ERROR
                for r in all_results
            )
            has_critical = any(
                not r.passed and r.severity == ValidationSeverity.CRITICAL
                for r in all_results
            )
            has_warning = any(
                not r.passed and r.severity == ValidationSeverity.WARNING
                for r in all_results
            )

            if has_critical:
                # Critical failure - halt ingestion
                raise Exception(
                    f"Critical validation failure: {[r for r in all_results if r.severity == ValidationSeverity.CRITICAL]}"
                )

            if has_error:
                rejected.append({
                    'record': record,
                    'validation_errors': [r for r in all_results if not r.passed]
                })
                validation_summary['rejected'] += 1

                # Track which checks failed
                for result in all_results:
                    if not result.passed:
                        validation_summary['checks_failed'].setdefault(
                            result.check_name, 0
                        )
                        validation_summary['checks_failed'][result.check_name] += 1
            else:
                accepted.append(record)
                validation_summary['accepted'] += 1

            if has_warning:
                validation_summary['warnings'] += 1

        # Check error threshold
        error_pct = (validation_summary['rejected'] / len(records) * 100) if records else 0
        if error_pct > self.error_threshold_pct:
            raise Exception(
                f"Batch error rate {error_pct:.1f}% exceeds threshold {self.error_threshold_pct}%"
            )

        return accepted, rejected, validation_summary

Detection: Testing in Transformation

The transformation layer is where most data issues surface. Use dbt tests and Great Expectations for comprehensive coverage.

dbt Tests

# models/staging/stg_orders.yml
version: 2

models:
  - name: stg_orders
    description: "Staged orders from source system"
    columns:
      - name: order_id
        description: "Unique order identifier"
        tests:
          - unique
          - not_null

      - name: customer_id
        description: "Customer who placed the order"
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

      - name: order_status
        description: "Current status of the order"
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

      - name: order_total
        description: "Total order amount in USD"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000

      - name: order_date
        description: "Date order was placed"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "order_date <= current_date()"

    tests:
      # Table-level tests
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - order_id
            - order_line_id

      - dbt_utils.recency:
          datepart: day
          field: updated_at
          interval: 1

      - row_count_check:
          min_count: 1000
          max_count: 10000000
-- tests/generic/row_count_check.sql
{% test row_count_check(model, min_count, max_count) %}

with row_count as (
    select count(*) as cnt from {{ model }}
)

select *
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}
-- tests/singular/assert_order_items_match_total.sql
-- Custom test: Order total should equal sum of line items
with order_totals as (
    select
        o.order_id,
        o.order_total as header_total,
        sum(oi.line_total) as calculated_total
    from {{ ref('stg_orders') }} o
    left join {{ ref('stg_order_items') }} oi on o.order_id = oi.order_id
    group by 1, 2
)

select *
from order_totals
where abs(header_total - calculated_total) > 0.01

Great Expectations

# expectations/orders_expectations.py
"""
Great Expectations suite for order data validation.
More sophisticated tests than dbt provides.
"""

import great_expectations as gx
from great_expectations.core import ExpectationConfiguration

def create_orders_suite(context: gx.DataContext) -> gx.ExpectationSuite:
    """Create comprehensive expectation suite for orders."""

    suite = context.create_expectation_suite(
        expectation_suite_name="orders_quality_suite",
        overwrite_existing=True
    )

    # ========================================================================
    # Schema / Structure Expectations
    # ========================================================================

    # Expect specific columns to exist
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_table_columns_to_match_set",
            kwargs={
                "column_set": [
                    "order_id", "customer_id", "order_date", "order_total",
                    "order_status", "shipping_address", "created_at", "updated_at"
                ],
                "exact_match": False  # Allow additional columns
            }
        )
    )

    # ========================================================================
    # Completeness Expectations
    # ========================================================================

    required_columns = ["order_id", "customer_id", "order_date", "order_total"]
    for col in required_columns:
        suite.add_expectation(
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_not_be_null",
                kwargs={"column": col}
            )
        )

    # Completeness rate for optional columns
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={
                "column": "shipping_address",
                "mostly": 0.95  # Allow up to 5% null
            }
        )
    )

    # ========================================================================
    # Validity Expectations
    # ========================================================================

    # Order ID format
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_match_regex",
            kwargs={
                "column": "order_id",
                "regex": r"^ORD-\d{8}-[A-Z]{4}$"
            }
        )
    )

    # Order status values
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_in_set",
            kwargs={
                "column": "order_status",
                "value_set": ["pending", "confirmed", "shipped", "delivered", "cancelled"]
            }
        )
    )

    # Order total range
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={
                "column": "order_total",
                "min_value": 0,
                "max_value": 1000000,
                "mostly": 0.999  # Allow rare exceptions
            }
        )
    )

    # ========================================================================
    # Uniqueness Expectations
    # ========================================================================

    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_unique",
            kwargs={"column": "order_id"}
        )
    )

    # ========================================================================
    # Statistical Expectations
    # ========================================================================

    # Row count in expected range
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_table_row_count_to_be_between",
            kwargs={
                "min_value": 10000,
                "max_value": 10000000
            }
        )
    )

    # Order total distribution
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_mean_to_be_between",
            kwargs={
                "column": "order_total",
                "min_value": 50,
                "max_value": 500
            }
        )
    )

    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_median_to_be_between",
            kwargs={
                "column": "order_total",
                "min_value": 30,
                "max_value": 300
            }
        )
    )

    # Standard deviation check (detect unusual variance)
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_stdev_to_be_between",
            kwargs={
                "column": "order_total",
                "min_value": 10,
                "max_value": 1000
            }
        )
    )

    # ========================================================================
    # Timeliness Expectations
    # ========================================================================

    # Data freshness
    suite.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_max_to_be_between",
            kwargs={
                "column": "updated_at",
                "min_value": {"$PARAMETER": "yesterday"},
                "max_value": {"$PARAMETER": "now"},
                "parse_strings_as_datetimes": True
            }
        )
    )

    return suite


def run_validation(context: gx.DataContext, batch_request) -> dict:
    """Run validation and return results."""

    checkpoint_config = {
        "name": "orders_checkpoint",
        "config_version": 1,
        "class_name": "Checkpoint",
        "validations": [
            {
                "batch_request": batch_request,
                "expectation_suite_name": "orders_quality_suite"
            }
        ],
        "action_list": [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"}
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction"}
            },
            {
                "name": "send_slack_notification",
                "action": {
                    "class_name": "SlackNotificationAction",
                    "slack_webhook": "${SLACK_WEBHOOK}",
                    "notify_on": "failure"
                }
            }
        ]
    }

    checkpoint = context.add_or_update_checkpoint(**checkpoint_config)
    result = checkpoint.run()

    return {
        "success": result.success,
        "statistics": result.run_results,
        "data_docs_url": context.get_docs_sites_urls()[0]
    }

Anomaly Detection

Static thresholds miss gradual drift. Anomaly detection catches issues that evolve over time.

# anomaly_detection.py
"""
Automated anomaly detection for data quality metrics.
Catches drift that static thresholds miss.
"""

import numpy as np
import pandas as pd
from scipy import stats
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum

class AnomalyType(Enum):
    """Types of anomalies we detect."""
    VOLUME = 'volume'           # Row count anomalies
    DISTRIBUTION = 'distribution'  # Value distribution shift
    FRESHNESS = 'freshness'     # Data staleness
    SCHEMA = 'schema'           # Schema changes
    NULL_RATE = 'null_rate'     # Null percentage changes


@dataclass
class Anomaly:
    """Detected anomaly."""
    type: AnomalyType
    metric_name: str
    current_value: float
    expected_value: float
    deviation_score: float  # Z-score or similar
    detected_at: datetime
    severity: str  # 'low', 'medium', 'high', 'critical'
    context: Dict[str, Any]


class StatisticalAnomalyDetector:
    """
    Detects anomalies using statistical methods.
    Maintains rolling baselines for comparison.
    """

    def __init__(
        self,
        baseline_window_days: int = 30,
        z_score_threshold: float = 3.0,
        min_data_points: int = 7
    ):
        self.baseline_window = timedelta(days=baseline_window_days)
        self.z_threshold = z_score_threshold
        self.min_data_points = min_data_points

    def detect_volume_anomaly(
        self,
        current_count: int,
        historical_counts: List[Tuple[datetime, int]]
    ) -> Optional[Anomaly]:
        """
        Detect if current row count is anomalous.
        Uses rolling z-score with seasonality awareness.
        """
        if len(historical_counts) < self.min_data_points:
            return None

        # Extract values within baseline window
        cutoff = datetime.utcnow() - self.baseline_window
        recent_counts = [
            count for dt, count in historical_counts
            if dt >= cutoff
        ]

        if len(recent_counts) < self.min_data_points:
            return None

        mean = np.mean(recent_counts)
        std = np.std(recent_counts)

        if std == 0:
            return None  # No variance, can't detect anomaly

        z_score = (current_count - mean) / std

        if abs(z_score) > self.z_threshold:
            severity = self._calculate_severity(abs(z_score))

            return Anomaly(
                type=AnomalyType.VOLUME,
                metric_name='row_count',
                current_value=current_count,
                expected_value=mean,
                deviation_score=z_score,
                detected_at=datetime.utcnow(),
                severity=severity,
                context={
                    'baseline_mean': mean,
                    'baseline_std': std,
                    'baseline_size': len(recent_counts),
                    'direction': 'high' if z_score > 0 else 'low'
                }
            )

        return None

    def detect_null_rate_anomaly(
        self,
        column_name: str,
        current_null_pct: float,
        historical_null_pcts: List[Tuple[datetime, float]]
    ) -> Optional[Anomaly]:
        """
        Detect unusual changes in null percentage.
        Important for catching upstream data issues.
        """
        if len(historical_null_pcts) < self.min_data_points:
            return None

        cutoff = datetime.utcnow() - self.baseline_window
        recent_pcts = [
            pct for dt, pct in historical_null_pcts
            if dt >= cutoff
        ]

        if len(recent_pcts) < self.min_data_points:
            return None

        mean = np.mean(recent_pcts)
        std = np.std(recent_pcts)

        # Special handling for normally-zero null rates
        if mean < 0.01 and current_null_pct > 0.05:
            # Column that's usually complete now has nulls
            return Anomaly(
                type=AnomalyType.NULL_RATE,
                metric_name=f'{column_name}_null_pct',
                current_value=current_null_pct,
                expected_value=mean,
                deviation_score=float('inf'),
                detected_at=datetime.utcnow(),
                severity='high',
                context={
                    'column': column_name,
                    'baseline_mean': mean,
                    'message': f'Column {column_name} unexpectedly has {current_null_pct:.1%} nulls'
                }
            )

        if std > 0:
            z_score = (current_null_pct - mean) / std

            if abs(z_score) > self.z_threshold:
                return Anomaly(
                    type=AnomalyType.NULL_RATE,
                    metric_name=f'{column_name}_null_pct',
                    current_value=current_null_pct,
                    expected_value=mean,
                    deviation_score=z_score,
                    detected_at=datetime.utcnow(),
                    severity=self._calculate_severity(abs(z_score)),
                    context={
                        'column': column_name,
                        'baseline_mean': mean,
                        'baseline_std': std
                    }
                )

        return None

    def detect_distribution_shift(
        self,
        column_name: str,
        current_values: np.ndarray,
        baseline_values: np.ndarray,
        test: str = 'ks'  # 'ks' for Kolmogorov-Smirnov, 'mw' for Mann-Whitney
    ) -> Optional[Anomaly]:
        """
        Detect distribution shift using statistical tests.
        Catches gradual drift in numeric columns.
        """
        if len(current_values) < 30 or len(baseline_values) < 30:
            return None  # Not enough data for reliable test

        if test == 'ks':
            statistic, p_value = stats.ks_2samp(current_values, baseline_values)
        elif test == 'mw':
            statistic, p_value = stats.mannwhitneyu(
                current_values, baseline_values,
                alternative='two-sided'
            )
        else:
            raise ValueError(f"Unknown test: {test}")

        # Significant at p < 0.01 with Bonferroni correction
        significance_threshold = 0.01

        if p_value < significance_threshold:
            # Calculate effect size for severity
            current_mean = np.mean(current_values)
            baseline_mean = np.mean(baseline_values)
            pooled_std = np.sqrt(
                (np.var(current_values) + np.var(baseline_values)) / 2
            )

            if pooled_std > 0:
                cohens_d = abs(current_mean - baseline_mean) / pooled_std
            else:
                cohens_d = 0

            severity = 'low' if cohens_d < 0.5 else ('medium' if cohens_d < 0.8 else 'high')

            return Anomaly(
                type=AnomalyType.DISTRIBUTION,
                metric_name=f'{column_name}_distribution',
                current_value=current_mean,
                expected_value=baseline_mean,
                deviation_score=statistic,
                detected_at=datetime.utcnow(),
                severity=severity,
                context={
                    'column': column_name,
                    'test': test,
                    'p_value': p_value,
                    'effect_size': cohens_d,
                    'current_median': np.median(current_values),
                    'baseline_median': np.median(baseline_values)
                }
            )

        return None

    def _calculate_severity(self, z_score: float) -> str:
        """Map z-score to severity level."""
        if z_score > 6:
            return 'critical'
        elif z_score > 4:
            return 'high'
        elif z_score > 3:
            return 'medium'
        return 'low'


class SeasonalAnomalyDetector:
    """
    Detects anomalies accounting for seasonal patterns.
    Essential for data with day-of-week or time-of-day patterns.
    """

    def __init__(self, seasonality_period: int = 7):
        """
        Initialize with seasonality period.

        Args:
            seasonality_period: Number of time units in one cycle
                               (7 for weekly, 24 for daily, etc.)
        """
        self.period = seasonality_period

    def detect_with_seasonality(
        self,
        metric_name: str,
        current_value: float,
        current_period_position: int,  # e.g., day of week (0-6)
        historical_data: pd.DataFrame  # columns: timestamp, value, period_position
    ) -> Optional[Anomaly]:
        """
        Detect anomalies while accounting for seasonality.
        Compares to same period in previous cycles.
        """
        # Filter to same period position
        same_period_data = historical_data[
            historical_data['period_position'] == current_period_position
        ]['value']

        if len(same_period_data) < 4:  # Need at least 4 same-period observations
            return None

        mean = same_period_data.mean()
        std = same_period_data.std()

        if std == 0:
            return None

        z_score = (current_value - mean) / std

        if abs(z_score) > 3:
            return Anomaly(
                type=AnomalyType.VOLUME,
                metric_name=metric_name,
                current_value=current_value,
                expected_value=mean,
                deviation_score=z_score,
                detected_at=datetime.utcnow(),
                severity='medium' if abs(z_score) < 4 else 'high',
                context={
                    'period_position': current_period_position,
                    'same_period_mean': mean,
                    'same_period_std': std,
                    'observations': len(same_period_data)
                }
            )

        return None

Data Contracts

Quality is a shared responsibility. Data contracts formalize expectations between producers and consumers.

# contracts/orders_contract.yaml
# Data contract for the orders domain

contract:
  name: "orders"
  version: "2.1.0"
  owner: "order-management-team"
  description: "Contract for order data from the order management system"

  # SLA definitions
  sla:
    freshness:
      max_delay_minutes: 15
      measurement: "max(updated_at) compared to current_time"
    availability:
      uptime_percent: 99.9
    support:
      response_time_hours: 4
      escalation_contact: "[email protected]"

  # Schema specification
  schema:
    fields:
      - name: order_id
        type: string
        description: "Unique order identifier"
        constraints:
          - not_null
          - unique
          - pattern: "^ORD-\\d{8}-[A-Z]{4}$"

      - name: customer_id
        type: string
        description: "Reference to customer who placed order"
        constraints:
          - not_null
          - foreign_key:
              table: customers
              column: customer_id

      - name: order_date
        type: timestamp
        description: "When the order was placed"
        constraints:
          - not_null
          - range:
              min: "2020-01-01"
              max: "current_date + 1 day"

      - name: order_total
        type: decimal(10,2)
        description: "Total order amount in USD"
        constraints:
          - not_null
          - range:
              min: 0.00
              max: 1000000.00

      - name: order_status
        type: string
        description: "Current order status"
        constraints:
          - not_null
          - enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

      - name: shipping_address
        type: string
        description: "Delivery address"
        constraints:
          - completeness: 0.95  # 95% must be non-null

  # Quality expectations
  quality:
    expectations:
      - name: "row_count_sanity"
        type: row_count_between
        min: 10000
        max: 10000000

      - name: "order_total_distribution"
        type: column_mean_between
        column: order_total
        min: 50
        max: 500

      - name: "freshness_check"
        type: max_column_value_fresh
        column: updated_at
        max_age_minutes: 60

      - name: "duplicate_check"
        type: column_unique
        column: order_id

  # Semantic rules
  semantics:
    - name: "order_total_matches_items"
      description: "Order total should equal sum of line items"
      expression: "abs(order_total - sum(line_items.total)) < 0.01"
      severity: error

    - name: "ship_date_after_order"
      description: "Ship date must be on or after order date"
      expression: "ship_date >= order_date"
      severity: error

  # Evolution policy
  evolution:
    breaking_changes:
      notification_days: 30
      approval_required: true
      approvers: ["data-platform-lead", "analytics-lead"]

    non_breaking_changes:
      notification_days: 7
      approval_required: false

  # Lineage
  lineage:
    source:
      system: "order-management-service"
      database: "orders_db"
      table: "orders"

    consumers:
      - name: "analytics-warehouse"
        usage: "Business intelligence and reporting"
        contact: "[email protected]"

      - name: "recommendation-service"
        usage: "Order-based recommendations"
        contact: "[email protected]"
# contract_validator.py
"""
Validates data against contract specifications.
Integrates with CI/CD for contract enforcement.
"""

import yaml
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ContractViolation:
    """Represents a contract violation."""
    contract_name: str
    contract_version: str
    rule_name: str
    severity: str
    message: str
    detected_at: datetime


class ContractValidator:
    """Validates data against data contracts."""

    def __init__(self, contract_path: str):
        with open(contract_path) as f:
            self.contract = yaml.safe_load(f)

        self.name = self.contract['contract']['name']
        self.version = self.contract['contract']['version']

    def validate_schema(self, df) -> List[ContractViolation]:
        """Validate dataframe against contract schema."""
        violations = []
        schema = self.contract['contract']['schema']

        for field_spec in schema['fields']:
            field_name = field_spec['name']

            # Check field exists
            if field_name not in df.columns:
                violations.append(ContractViolation(
                    contract_name=self.name,
                    contract_version=self.version,
                    rule_name=f"field_exists_{field_name}",
                    severity="error",
                    message=f"Required field '{field_name}' not found in data",
                    detected_at=datetime.utcnow()
                ))
                continue

            # Check constraints
            for constraint in field_spec.get('constraints', []):
                if constraint == 'not_null':
                    null_count = df[field_name].isnull().sum()
                    if null_count > 0:
                        violations.append(ContractViolation(
                            contract_name=self.name,
                            contract_version=self.version,
                            rule_name=f"not_null_{field_name}",
                            severity="error",
                            message=f"Field '{field_name}' has {null_count} null values",
                            detected_at=datetime.utcnow()
                        ))

                elif constraint == 'unique':
                    dup_count = df[field_name].duplicated().sum()
                    if dup_count > 0:
                        violations.append(ContractViolation(
                            contract_name=self.name,
                            contract_version=self.version,
                            rule_name=f"unique_{field_name}",
                            severity="error",
                            message=f"Field '{field_name}' has {dup_count} duplicate values",
                            detected_at=datetime.utcnow()
                        ))

                elif isinstance(constraint, dict):
                    if 'completeness' in constraint:
                        required_pct = constraint['completeness']
                        actual_pct = 1 - (df[field_name].isnull().sum() / len(df))
                        if actual_pct < required_pct:
                            violations.append(ContractViolation(
                                contract_name=self.name,
                                contract_version=self.version,
                                rule_name=f"completeness_{field_name}",
                                severity="warning",
                                message=f"Field '{field_name}' completeness {actual_pct:.1%} below required {required_pct:.1%}",
                                detected_at=datetime.utcnow()
                            ))

        return violations

    def validate_sla(self, max_timestamp: datetime) -> List[ContractViolation]:
        """Validate data freshness against SLA."""
        violations = []
        sla = self.contract['contract']['sla']

        max_delay = sla['freshness']['max_delay_minutes']
        age_minutes = (datetime.utcnow() - max_timestamp).total_seconds() / 60

        if age_minutes > max_delay:
            violations.append(ContractViolation(
                contract_name=self.name,
                contract_version=self.version,
                rule_name="freshness_sla",
                severity="error",
                message=f"Data is {age_minutes:.0f} minutes old, exceeds SLA of {max_delay} minutes",
                detected_at=datetime.utcnow()
            ))

        return violations

Operational Data Quality

Quality isn't just testing—it's an ongoing operational practice.

Quality Metrics Dashboard

-- quality_metrics.sql
-- Aggregated quality metrics for operational monitoring

-- Daily quality summary
CREATE OR REPLACE VIEW data_quality.daily_summary AS
SELECT
    date_trunc('day', check_time) as check_date,
    dataset_name,
    check_category,
    COUNT(*) as total_checks,
    SUM(CASE WHEN passed THEN 1 ELSE 0 END) as passed_checks,
    SUM(CASE WHEN NOT passed THEN 1 ELSE 0 END) as failed_checks,
    ROUND(
        100.0 * SUM(CASE WHEN passed THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) as pass_rate_pct,
    AVG(execution_time_ms) as avg_check_time_ms
FROM data_quality.check_results
WHERE check_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2, 3
ORDER BY 1 DESC, 4 ASC;

-- Quality trend analysis
CREATE OR REPLACE VIEW data_quality.trend_analysis AS
WITH daily_metrics AS (
    SELECT
        date_trunc('day', check_time) as check_date,
        dataset_name,
        ROUND(
            100.0 * SUM(CASE WHEN passed THEN 1 ELSE 0 END) / COUNT(*),
            2
        ) as pass_rate
    FROM data_quality.check_results
    WHERE check_time >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY 1, 2
),
with_lag AS (
    SELECT
        *,
        LAG(pass_rate, 7) OVER (
            PARTITION BY dataset_name
            ORDER BY check_date
        ) as pass_rate_7d_ago,
        LAG(pass_rate, 30) OVER (
            PARTITION BY dataset_name
            ORDER BY check_date
        ) as pass_rate_30d_ago
    FROM daily_metrics
)
SELECT
    check_date,
    dataset_name,
    pass_rate,
    pass_rate - pass_rate_7d_ago as change_7d,
    pass_rate - pass_rate_30d_ago as change_30d,
    CASE
        WHEN pass_rate >= 99 THEN 'excellent'
        WHEN pass_rate >= 95 THEN 'good'
        WHEN pass_rate >= 90 THEN 'acceptable'
        ELSE 'needs_attention'
    END as quality_tier
FROM with_lag
WHERE check_date >= CURRENT_DATE - INTERVAL '30 days';

-- Failed checks detail for investigation
CREATE OR REPLACE VIEW data_quality.failed_checks_detail AS
SELECT
    check_time,
    dataset_name,
    check_name,
    check_category,
    failure_message,
    affected_rows,
    sample_failing_values,
    remediation_suggestion
FROM data_quality.check_results
WHERE NOT passed
    AND check_time >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY check_time DESC;

Alerting Configuration

# alerts/quality_alerts.yaml
# Alerting rules for data quality monitoring

alerts:
  - name: "critical_quality_failure"
    description: "Critical data quality check failed"
    condition: |
      quality_check.severity == 'critical'
      AND quality_check.passed == false
    channels:
      - type: pagerduty
        service: data-platform-critical
      - type: slack
        channel: "#data-incidents"
    runbook_url: "https://runbooks.company.com/quality/critical-failure"

  - name: "freshness_sla_breach"
    description: "Data freshness SLA violated"
    condition: |
      data_age_minutes > sla_max_delay_minutes
    channels:
      - type: slack
        channel: "#data-alerts"
      - type: email
        to: "[email protected]"
    runbook_url: "https://runbooks.company.com/quality/freshness-breach"

  - name: "quality_degradation"
    description: "Significant quality score degradation"
    condition: |
      current_pass_rate < (historical_pass_rate - 5)
      AND current_pass_rate < 95
    channels:
      - type: slack
        channel: "#data-alerts"
    runbook_url: "https://runbooks.company.com/quality/degradation"

  - name: "anomaly_detected"
    description: "Statistical anomaly detected in data"
    condition: |
      anomaly.severity IN ('high', 'critical')
    channels:
      - type: slack
        channel: "#data-anomalies"
    runbook_url: "https://runbooks.company.com/quality/anomaly-investigation"

Key Takeaways

  1. Quality is a lifecycle concern: Embed checks at ingestion, transformation, and consumption stages. Prevention is cheaper than cure.

  2. Cover all six dimensions: Accuracy, completeness, consistency, timeliness, validity, and uniqueness each require different testing strategies.

  3. Combine static and statistical tests: Schema validation catches structural issues; anomaly detection catches drift that static thresholds miss.

  4. Establish data contracts: Formalize expectations between producers and consumers. Make quality a shared responsibility.

  5. Operationalize quality: Dashboards, alerts, and runbooks transform quality from a development concern to an operational practice.

  6. Automate ruthlessly: Manual quality checks don't scale. Every check should run automatically on every data load.

References

  1. DAMA International. (2017). DAMA-DMBOK: Data Management Body of Knowledge (2nd ed.)
  2. Redman, T. C. (2001). Data Quality: The Field Guide. Digital Press.
  3. Great Expectations Documentation: https://docs.greatexpectations.io/
  4. dbt Tests Documentation: https://docs.getdbt.com/docs/build/tests
  5. Soda Documentation: https://docs.soda.io/
  6. Monte Carlo Data Observability: https://www.montecarlodata.com/

Key Takeaways

  • Data quality is a lifecycle concern: embed quality checks at ingestion, transformation, and consumption stages
  • The six dimensions of data quality (accuracy, completeness, consistency, timeliness, validity, uniqueness) require different testing strategies
  • Combine schema tests for structural validation with statistical tests for value-level anomalies
  • Automated anomaly detection catches drift that static thresholds miss
  • Data quality is a shared responsibility—establish clear ownership through data contracts
Gemut Analytics Team
Gemut Analytics Team
Data Engineering Experts