Data Quality at Scale: Building a Comprehensive Framework
A practitioner's guide to implementing data quality across the data lifecycle—from prevention and detection to resolution and governance
Data quality issues cost organizations an average of $12.9 million annually. Yet most teams treat quality as an afterthought—adding tests only after problems surface. This guide presents a proactive framework covering the six dimensions of data quality, automated testing with Great Expectations and dbt, anomaly detection, and operational practices for maintaining quality at scale.
- Experience with SQL and data warehousing concepts
- Familiarity with Python programming
- Basic understanding of data pipelines

Data Quality at Scale
The Cost of Poor Data Quality
According to Gartner, poor data quality costs organizations an average of $12.9 million annually. Beyond direct costs, bad data erodes trust: analysts second-guess reports, executives question dashboards, and data initiatives stall as stakeholders lose confidence.
Yet most organizations approach data quality reactively—adding tests only after problems manifest in production. This guide presents a proactive framework that treats quality as a first-class concern throughout the data lifecycle.
The Six Dimensions of Data Quality
Data quality isn't monolithic. The Data Management Association (DAMA) identifies six distinct dimensions, each requiring different detection and remediation strategies:
┌────────────────────────────────────────────────────────────────────────────┐
│ SIX DIMENSIONS OF DATA QUALITY │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ACCURACY COMPLETENESS │
│ ──────── ──────────── │
│ Data correctly represents reality All required data is present │
│ • Validation against source • Null checks │
│ • Cross-reference verification • Missing record detection │
│ • Sampling and audit • Coverage analysis │
│ │
│ CONSISTENCY TIMELINESS │
│ ─────────── ────────── │
│ Data is coherent across systems Data is current enough │
│ • Cross-system reconciliation • Freshness SLAs │
│ • Referential integrity • Latency monitoring │
│ • Business rule validation • Staleness alerts │
│ │
│ VALIDITY UNIQUENESS │
│ ──────── ────────── │
│ Data conforms to defined formats No unintended duplicates │
│ • Schema conformance • Primary key validation │
│ • Range and constraint checks • Fuzzy duplicate detection │
│ • Pattern matching • Entity resolution │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Quality Across the Data Lifecycle
Different stages of the data lifecycle require different quality approaches:
Prevention: Quality at Ingestion
The cheapest quality issue is one that never enters your system. Strong ingestion-time validation catches problems at the source.
# ingestion_quality.py
"""
Data quality validation at ingestion time.
Prevents bad data from entering the data lake.
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import re
class ValidationSeverity(Enum):
"""Severity levels for validation failures."""
WARNING = 'warning' # Log but allow
ERROR = 'error' # Reject record
CRITICAL = 'critical' # Halt ingestion
@dataclass
class ValidationResult:
"""Result of a validation check."""
passed: bool
check_name: str
severity: ValidationSeverity
message: str = ""
failed_records: List[str] = field(default_factory=list)
failure_count: int = 0
details: Dict[str, Any] = field(default_factory=dict)
class SchemaValidator:
"""
Validates incoming data against expected schema.
First line of defense against structural issues.
"""
def __init__(self, schema: Dict[str, Any]):
"""
Initialize with JSON Schema specification.
Example schema:
{
"type": "object",
"required": ["id", "timestamp", "value"],
"properties": {
"id": {"type": "string", "pattern": "^[A-Z]{2}\\d{6}$"},
"timestamp": {"type": "string", "format": "date-time"},
"value": {"type": "number", "minimum": 0},
"status": {"type": "string", "enum": ["active", "inactive"]}
}
}
"""
self.schema = schema
self.required_fields = schema.get('required', [])
self.properties = schema.get('properties', {})
def validate_record(self, record: Dict[str, Any]) -> List[ValidationResult]:
"""Validate a single record against schema."""
results = []
# Check required fields
for field_name in self.required_fields:
if field_name not in record or record[field_name] is None:
results.append(ValidationResult(
passed=False,
check_name=f"required_field_{field_name}",
severity=ValidationSeverity.ERROR,
message=f"Missing required field: {field_name}"
))
# Validate each property
for field_name, field_schema in self.properties.items():
if field_name not in record:
continue
value = record[field_name]
if value is None and field_name not in self.required_fields:
continue # Optional null values are OK
# Type validation
expected_type = field_schema.get('type')
if not self._check_type(value, expected_type):
results.append(ValidationResult(
passed=False,
check_name=f"type_{field_name}",
severity=ValidationSeverity.ERROR,
message=f"Field {field_name} has wrong type. Expected {expected_type}, got {type(value).__name__}"
))
continue
# Pattern validation
if 'pattern' in field_schema and isinstance(value, str):
if not re.match(field_schema['pattern'], value):
results.append(ValidationResult(
passed=False,
check_name=f"pattern_{field_name}",
severity=ValidationSeverity.ERROR,
message=f"Field {field_name} does not match pattern {field_schema['pattern']}"
))
# Enum validation
if 'enum' in field_schema:
if value not in field_schema['enum']:
results.append(ValidationResult(
passed=False,
check_name=f"enum_{field_name}",
severity=ValidationSeverity.ERROR,
message=f"Field {field_name} value '{value}' not in allowed values: {field_schema['enum']}"
))
# Range validation
if 'minimum' in field_schema and value < field_schema['minimum']:
results.append(ValidationResult(
passed=False,
check_name=f"minimum_{field_name}",
severity=ValidationSeverity.WARNING,
message=f"Field {field_name} value {value} below minimum {field_schema['minimum']}"
))
if 'maximum' in field_schema and value > field_schema['maximum']:
results.append(ValidationResult(
passed=False,
check_name=f"maximum_{field_name}",
severity=ValidationSeverity.WARNING,
message=f"Field {field_name} value {value} above maximum {field_schema['maximum']}"
))
if not results:
results.append(ValidationResult(
passed=True,
check_name="schema_validation",
severity=ValidationSeverity.WARNING,
message="All schema validations passed"
))
return results
def _check_type(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected JSON Schema type."""
type_mapping = {
'string': str,
'number': (int, float),
'integer': int,
'boolean': bool,
'array': list,
'object': dict
}
if expected_type not in type_mapping:
return True # Unknown type, allow
return isinstance(value, type_mapping[expected_type])
class BusinessRuleValidator:
"""
Validates data against business rules.
Catches semantic issues that schema validation misses.
"""
def __init__(self, rules: List[Dict[str, Any]]):
"""
Initialize with business rule definitions.
Example rules:
[
{
"name": "order_total_matches_items",
"expression": "total == sum(items.price * items.quantity)",
"severity": "error"
},
{
"name": "future_date_check",
"expression": "ship_date >= order_date",
"severity": "error"
}
]
"""
self.rules = rules
def validate_record(self, record: Dict[str, Any]) -> List[ValidationResult]:
"""Apply all business rules to a record."""
results = []
for rule in self.rules:
try:
passed = self._evaluate_rule(record, rule['expression'])
severity = ValidationSeverity[rule.get('severity', 'warning').upper()]
results.append(ValidationResult(
passed=passed,
check_name=rule['name'],
severity=severity,
message=f"Business rule '{rule['name']}' {'passed' if passed else 'failed'}"
))
except Exception as e:
results.append(ValidationResult(
passed=False,
check_name=rule['name'],
severity=ValidationSeverity.WARNING,
message=f"Could not evaluate rule '{rule['name']}': {str(e)}"
))
return results
def _evaluate_rule(self, record: Dict[str, Any], expression: str) -> bool:
"""
Evaluate a rule expression against a record.
Uses a safe expression evaluator.
"""
# Simple implementation - in production, use a proper expression parser
# like simpleeval or ast.literal_eval with safety checks
local_vars = {**record}
# Handle common patterns
if '>=' in expression:
left, right = expression.split('>=')
left_val = local_vars.get(left.strip())
right_val = local_vars.get(right.strip())
return left_val >= right_val if left_val and right_val else False
if '==' in expression:
left, right = expression.split('==')
left_val = local_vars.get(left.strip())
right_val = local_vars.get(right.strip())
return left_val == right_val
return True # Unknown expression format
class IngestionQualityGate:
"""
Quality gate for data ingestion.
Coordinates validation and decides accept/reject.
"""
def __init__(
self,
schema_validator: SchemaValidator,
business_validator: BusinessRuleValidator,
error_threshold_pct: float = 5.0
):
self.schema_validator = schema_validator
self.business_validator = business_validator
self.error_threshold_pct = error_threshold_pct
def process_batch(
self,
records: List[Dict[str, Any]]
) -> tuple[List[Dict], List[Dict], Dict[str, Any]]:
"""
Process a batch of records through quality gate.
Returns:
Tuple of (accepted_records, rejected_records, summary)
"""
accepted = []
rejected = []
validation_summary = {
'total_records': len(records),
'accepted': 0,
'rejected': 0,
'warnings': 0,
'checks_failed': {}
}
for record in records:
# Run all validations
schema_results = self.schema_validator.validate_record(record)
business_results = self.business_validator.validate_record(record)
all_results = schema_results + business_results
# Classify results
has_error = any(
not r.passed and r.severity == ValidationSeverity.ERROR
for r in all_results
)
has_critical = any(
not r.passed and r.severity == ValidationSeverity.CRITICAL
for r in all_results
)
has_warning = any(
not r.passed and r.severity == ValidationSeverity.WARNING
for r in all_results
)
if has_critical:
# Critical failure - halt ingestion
raise Exception(
f"Critical validation failure: {[r for r in all_results if r.severity == ValidationSeverity.CRITICAL]}"
)
if has_error:
rejected.append({
'record': record,
'validation_errors': [r for r in all_results if not r.passed]
})
validation_summary['rejected'] += 1
# Track which checks failed
for result in all_results:
if not result.passed:
validation_summary['checks_failed'].setdefault(
result.check_name, 0
)
validation_summary['checks_failed'][result.check_name] += 1
else:
accepted.append(record)
validation_summary['accepted'] += 1
if has_warning:
validation_summary['warnings'] += 1
# Check error threshold
error_pct = (validation_summary['rejected'] / len(records) * 100) if records else 0
if error_pct > self.error_threshold_pct:
raise Exception(
f"Batch error rate {error_pct:.1f}% exceeds threshold {self.error_threshold_pct}%"
)
return accepted, rejected, validation_summary
Detection: Testing in Transformation
The transformation layer is where most data issues surface. Use dbt tests and Great Expectations for comprehensive coverage.
dbt Tests
# models/staging/stg_orders.yml
version: 2
models:
- name: stg_orders
description: "Staged orders from source system"
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- name: customer_id
description: "Customer who placed the order"
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: order_status
description: "Current status of the order"
tests:
- not_null
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
- name: order_total
description: "Total order amount in USD"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_date
description: "Date order was placed"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "order_date <= current_date()"
tests:
# Table-level tests
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- order_id
- order_line_id
- dbt_utils.recency:
datepart: day
field: updated_at
interval: 1
- row_count_check:
min_count: 1000
max_count: 10000000
-- tests/generic/row_count_check.sql
{% test row_count_check(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select *
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- tests/singular/assert_order_items_match_total.sql
-- Custom test: Order total should equal sum of line items
with order_totals as (
select
o.order_id,
o.order_total as header_total,
sum(oi.line_total) as calculated_total
from {{ ref('stg_orders') }} o
left join {{ ref('stg_order_items') }} oi on o.order_id = oi.order_id
group by 1, 2
)
select *
from order_totals
where abs(header_total - calculated_total) > 0.01
Great Expectations
# expectations/orders_expectations.py
"""
Great Expectations suite for order data validation.
More sophisticated tests than dbt provides.
"""
import great_expectations as gx
from great_expectations.core import ExpectationConfiguration
def create_orders_suite(context: gx.DataContext) -> gx.ExpectationSuite:
"""Create comprehensive expectation suite for orders."""
suite = context.create_expectation_suite(
expectation_suite_name="orders_quality_suite",
overwrite_existing=True
)
# ========================================================================
# Schema / Structure Expectations
# ========================================================================
# Expect specific columns to exist
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": [
"order_id", "customer_id", "order_date", "order_total",
"order_status", "shipping_address", "created_at", "updated_at"
],
"exact_match": False # Allow additional columns
}
)
)
# ========================================================================
# Completeness Expectations
# ========================================================================
required_columns = ["order_id", "customer_id", "order_date", "order_total"]
for col in required_columns:
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": col}
)
)
# Completeness rate for optional columns
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={
"column": "shipping_address",
"mostly": 0.95 # Allow up to 5% null
}
)
)
# ========================================================================
# Validity Expectations
# ========================================================================
# Order ID format
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_match_regex",
kwargs={
"column": "order_id",
"regex": r"^ORD-\d{8}-[A-Z]{4}$"
}
)
)
# Order status values
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "order_status",
"value_set": ["pending", "confirmed", "shipped", "delivered", "cancelled"]
}
)
)
# Order total range
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "order_total",
"min_value": 0,
"max_value": 1000000,
"mostly": 0.999 # Allow rare exceptions
}
)
)
# ========================================================================
# Uniqueness Expectations
# ========================================================================
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
)
)
# ========================================================================
# Statistical Expectations
# ========================================================================
# Row count in expected range
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 10000,
"max_value": 10000000
}
)
)
# Order total distribution
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "order_total",
"min_value": 50,
"max_value": 500
}
)
)
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_median_to_be_between",
kwargs={
"column": "order_total",
"min_value": 30,
"max_value": 300
}
)
)
# Standard deviation check (detect unusual variance)
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_stdev_to_be_between",
kwargs={
"column": "order_total",
"min_value": 10,
"max_value": 1000
}
)
)
# ========================================================================
# Timeliness Expectations
# ========================================================================
# Data freshness
suite.add_expectation(
ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "updated_at",
"min_value": {"$PARAMETER": "yesterday"},
"max_value": {"$PARAMETER": "now"},
"parse_strings_as_datetimes": True
}
)
)
return suite
def run_validation(context: gx.DataContext, batch_request) -> dict:
"""Run validation and return results."""
checkpoint_config = {
"name": "orders_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": "orders_quality_suite"
}
],
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
},
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK}",
"notify_on": "failure"
}
}
]
}
checkpoint = context.add_or_update_checkpoint(**checkpoint_config)
result = checkpoint.run()
return {
"success": result.success,
"statistics": result.run_results,
"data_docs_url": context.get_docs_sites_urls()[0]
}
Anomaly Detection
Static thresholds miss gradual drift. Anomaly detection catches issues that evolve over time.
# anomaly_detection.py
"""
Automated anomaly detection for data quality metrics.
Catches drift that static thresholds miss.
"""
import numpy as np
import pandas as pd
from scipy import stats
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class AnomalyType(Enum):
"""Types of anomalies we detect."""
VOLUME = 'volume' # Row count anomalies
DISTRIBUTION = 'distribution' # Value distribution shift
FRESHNESS = 'freshness' # Data staleness
SCHEMA = 'schema' # Schema changes
NULL_RATE = 'null_rate' # Null percentage changes
@dataclass
class Anomaly:
"""Detected anomaly."""
type: AnomalyType
metric_name: str
current_value: float
expected_value: float
deviation_score: float # Z-score or similar
detected_at: datetime
severity: str # 'low', 'medium', 'high', 'critical'
context: Dict[str, Any]
class StatisticalAnomalyDetector:
"""
Detects anomalies using statistical methods.
Maintains rolling baselines for comparison.
"""
def __init__(
self,
baseline_window_days: int = 30,
z_score_threshold: float = 3.0,
min_data_points: int = 7
):
self.baseline_window = timedelta(days=baseline_window_days)
self.z_threshold = z_score_threshold
self.min_data_points = min_data_points
def detect_volume_anomaly(
self,
current_count: int,
historical_counts: List[Tuple[datetime, int]]
) -> Optional[Anomaly]:
"""
Detect if current row count is anomalous.
Uses rolling z-score with seasonality awareness.
"""
if len(historical_counts) < self.min_data_points:
return None
# Extract values within baseline window
cutoff = datetime.utcnow() - self.baseline_window
recent_counts = [
count for dt, count in historical_counts
if dt >= cutoff
]
if len(recent_counts) < self.min_data_points:
return None
mean = np.mean(recent_counts)
std = np.std(recent_counts)
if std == 0:
return None # No variance, can't detect anomaly
z_score = (current_count - mean) / std
if abs(z_score) > self.z_threshold:
severity = self._calculate_severity(abs(z_score))
return Anomaly(
type=AnomalyType.VOLUME,
metric_name='row_count',
current_value=current_count,
expected_value=mean,
deviation_score=z_score,
detected_at=datetime.utcnow(),
severity=severity,
context={
'baseline_mean': mean,
'baseline_std': std,
'baseline_size': len(recent_counts),
'direction': 'high' if z_score > 0 else 'low'
}
)
return None
def detect_null_rate_anomaly(
self,
column_name: str,
current_null_pct: float,
historical_null_pcts: List[Tuple[datetime, float]]
) -> Optional[Anomaly]:
"""
Detect unusual changes in null percentage.
Important for catching upstream data issues.
"""
if len(historical_null_pcts) < self.min_data_points:
return None
cutoff = datetime.utcnow() - self.baseline_window
recent_pcts = [
pct for dt, pct in historical_null_pcts
if dt >= cutoff
]
if len(recent_pcts) < self.min_data_points:
return None
mean = np.mean(recent_pcts)
std = np.std(recent_pcts)
# Special handling for normally-zero null rates
if mean < 0.01 and current_null_pct > 0.05:
# Column that's usually complete now has nulls
return Anomaly(
type=AnomalyType.NULL_RATE,
metric_name=f'{column_name}_null_pct',
current_value=current_null_pct,
expected_value=mean,
deviation_score=float('inf'),
detected_at=datetime.utcnow(),
severity='high',
context={
'column': column_name,
'baseline_mean': mean,
'message': f'Column {column_name} unexpectedly has {current_null_pct:.1%} nulls'
}
)
if std > 0:
z_score = (current_null_pct - mean) / std
if abs(z_score) > self.z_threshold:
return Anomaly(
type=AnomalyType.NULL_RATE,
metric_name=f'{column_name}_null_pct',
current_value=current_null_pct,
expected_value=mean,
deviation_score=z_score,
detected_at=datetime.utcnow(),
severity=self._calculate_severity(abs(z_score)),
context={
'column': column_name,
'baseline_mean': mean,
'baseline_std': std
}
)
return None
def detect_distribution_shift(
self,
column_name: str,
current_values: np.ndarray,
baseline_values: np.ndarray,
test: str = 'ks' # 'ks' for Kolmogorov-Smirnov, 'mw' for Mann-Whitney
) -> Optional[Anomaly]:
"""
Detect distribution shift using statistical tests.
Catches gradual drift in numeric columns.
"""
if len(current_values) < 30 or len(baseline_values) < 30:
return None # Not enough data for reliable test
if test == 'ks':
statistic, p_value = stats.ks_2samp(current_values, baseline_values)
elif test == 'mw':
statistic, p_value = stats.mannwhitneyu(
current_values, baseline_values,
alternative='two-sided'
)
else:
raise ValueError(f"Unknown test: {test}")
# Significant at p < 0.01 with Bonferroni correction
significance_threshold = 0.01
if p_value < significance_threshold:
# Calculate effect size for severity
current_mean = np.mean(current_values)
baseline_mean = np.mean(baseline_values)
pooled_std = np.sqrt(
(np.var(current_values) + np.var(baseline_values)) / 2
)
if pooled_std > 0:
cohens_d = abs(current_mean - baseline_mean) / pooled_std
else:
cohens_d = 0
severity = 'low' if cohens_d < 0.5 else ('medium' if cohens_d < 0.8 else 'high')
return Anomaly(
type=AnomalyType.DISTRIBUTION,
metric_name=f'{column_name}_distribution',
current_value=current_mean,
expected_value=baseline_mean,
deviation_score=statistic,
detected_at=datetime.utcnow(),
severity=severity,
context={
'column': column_name,
'test': test,
'p_value': p_value,
'effect_size': cohens_d,
'current_median': np.median(current_values),
'baseline_median': np.median(baseline_values)
}
)
return None
def _calculate_severity(self, z_score: float) -> str:
"""Map z-score to severity level."""
if z_score > 6:
return 'critical'
elif z_score > 4:
return 'high'
elif z_score > 3:
return 'medium'
return 'low'
class SeasonalAnomalyDetector:
"""
Detects anomalies accounting for seasonal patterns.
Essential for data with day-of-week or time-of-day patterns.
"""
def __init__(self, seasonality_period: int = 7):
"""
Initialize with seasonality period.
Args:
seasonality_period: Number of time units in one cycle
(7 for weekly, 24 for daily, etc.)
"""
self.period = seasonality_period
def detect_with_seasonality(
self,
metric_name: str,
current_value: float,
current_period_position: int, # e.g., day of week (0-6)
historical_data: pd.DataFrame # columns: timestamp, value, period_position
) -> Optional[Anomaly]:
"""
Detect anomalies while accounting for seasonality.
Compares to same period in previous cycles.
"""
# Filter to same period position
same_period_data = historical_data[
historical_data['period_position'] == current_period_position
]['value']
if len(same_period_data) < 4: # Need at least 4 same-period observations
return None
mean = same_period_data.mean()
std = same_period_data.std()
if std == 0:
return None
z_score = (current_value - mean) / std
if abs(z_score) > 3:
return Anomaly(
type=AnomalyType.VOLUME,
metric_name=metric_name,
current_value=current_value,
expected_value=mean,
deviation_score=z_score,
detected_at=datetime.utcnow(),
severity='medium' if abs(z_score) < 4 else 'high',
context={
'period_position': current_period_position,
'same_period_mean': mean,
'same_period_std': std,
'observations': len(same_period_data)
}
)
return None
Data Contracts
Quality is a shared responsibility. Data contracts formalize expectations between producers and consumers.
# contracts/orders_contract.yaml
# Data contract for the orders domain
contract:
name: "orders"
version: "2.1.0"
owner: "order-management-team"
description: "Contract for order data from the order management system"
# SLA definitions
sla:
freshness:
max_delay_minutes: 15
measurement: "max(updated_at) compared to current_time"
availability:
uptime_percent: 99.9
support:
response_time_hours: 4
escalation_contact: "[email protected]"
# Schema specification
schema:
fields:
- name: order_id
type: string
description: "Unique order identifier"
constraints:
- not_null
- unique
- pattern: "^ORD-\\d{8}-[A-Z]{4}$"
- name: customer_id
type: string
description: "Reference to customer who placed order"
constraints:
- not_null
- foreign_key:
table: customers
column: customer_id
- name: order_date
type: timestamp
description: "When the order was placed"
constraints:
- not_null
- range:
min: "2020-01-01"
max: "current_date + 1 day"
- name: order_total
type: decimal(10,2)
description: "Total order amount in USD"
constraints:
- not_null
- range:
min: 0.00
max: 1000000.00
- name: order_status
type: string
description: "Current order status"
constraints:
- not_null
- enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
- name: shipping_address
type: string
description: "Delivery address"
constraints:
- completeness: 0.95 # 95% must be non-null
# Quality expectations
quality:
expectations:
- name: "row_count_sanity"
type: row_count_between
min: 10000
max: 10000000
- name: "order_total_distribution"
type: column_mean_between
column: order_total
min: 50
max: 500
- name: "freshness_check"
type: max_column_value_fresh
column: updated_at
max_age_minutes: 60
- name: "duplicate_check"
type: column_unique
column: order_id
# Semantic rules
semantics:
- name: "order_total_matches_items"
description: "Order total should equal sum of line items"
expression: "abs(order_total - sum(line_items.total)) < 0.01"
severity: error
- name: "ship_date_after_order"
description: "Ship date must be on or after order date"
expression: "ship_date >= order_date"
severity: error
# Evolution policy
evolution:
breaking_changes:
notification_days: 30
approval_required: true
approvers: ["data-platform-lead", "analytics-lead"]
non_breaking_changes:
notification_days: 7
approval_required: false
# Lineage
lineage:
source:
system: "order-management-service"
database: "orders_db"
table: "orders"
consumers:
- name: "analytics-warehouse"
usage: "Business intelligence and reporting"
contact: "[email protected]"
- name: "recommendation-service"
usage: "Order-based recommendations"
contact: "[email protected]"
# contract_validator.py
"""
Validates data against contract specifications.
Integrates with CI/CD for contract enforcement.
"""
import yaml
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ContractViolation:
"""Represents a contract violation."""
contract_name: str
contract_version: str
rule_name: str
severity: str
message: str
detected_at: datetime
class ContractValidator:
"""Validates data against data contracts."""
def __init__(self, contract_path: str):
with open(contract_path) as f:
self.contract = yaml.safe_load(f)
self.name = self.contract['contract']['name']
self.version = self.contract['contract']['version']
def validate_schema(self, df) -> List[ContractViolation]:
"""Validate dataframe against contract schema."""
violations = []
schema = self.contract['contract']['schema']
for field_spec in schema['fields']:
field_name = field_spec['name']
# Check field exists
if field_name not in df.columns:
violations.append(ContractViolation(
contract_name=self.name,
contract_version=self.version,
rule_name=f"field_exists_{field_name}",
severity="error",
message=f"Required field '{field_name}' not found in data",
detected_at=datetime.utcnow()
))
continue
# Check constraints
for constraint in field_spec.get('constraints', []):
if constraint == 'not_null':
null_count = df[field_name].isnull().sum()
if null_count > 0:
violations.append(ContractViolation(
contract_name=self.name,
contract_version=self.version,
rule_name=f"not_null_{field_name}",
severity="error",
message=f"Field '{field_name}' has {null_count} null values",
detected_at=datetime.utcnow()
))
elif constraint == 'unique':
dup_count = df[field_name].duplicated().sum()
if dup_count > 0:
violations.append(ContractViolation(
contract_name=self.name,
contract_version=self.version,
rule_name=f"unique_{field_name}",
severity="error",
message=f"Field '{field_name}' has {dup_count} duplicate values",
detected_at=datetime.utcnow()
))
elif isinstance(constraint, dict):
if 'completeness' in constraint:
required_pct = constraint['completeness']
actual_pct = 1 - (df[field_name].isnull().sum() / len(df))
if actual_pct < required_pct:
violations.append(ContractViolation(
contract_name=self.name,
contract_version=self.version,
rule_name=f"completeness_{field_name}",
severity="warning",
message=f"Field '{field_name}' completeness {actual_pct:.1%} below required {required_pct:.1%}",
detected_at=datetime.utcnow()
))
return violations
def validate_sla(self, max_timestamp: datetime) -> List[ContractViolation]:
"""Validate data freshness against SLA."""
violations = []
sla = self.contract['contract']['sla']
max_delay = sla['freshness']['max_delay_minutes']
age_minutes = (datetime.utcnow() - max_timestamp).total_seconds() / 60
if age_minutes > max_delay:
violations.append(ContractViolation(
contract_name=self.name,
contract_version=self.version,
rule_name="freshness_sla",
severity="error",
message=f"Data is {age_minutes:.0f} minutes old, exceeds SLA of {max_delay} minutes",
detected_at=datetime.utcnow()
))
return violations
Operational Data Quality
Quality isn't just testing—it's an ongoing operational practice.
Quality Metrics Dashboard
-- quality_metrics.sql
-- Aggregated quality metrics for operational monitoring
-- Daily quality summary
CREATE OR REPLACE VIEW data_quality.daily_summary AS
SELECT
date_trunc('day', check_time) as check_date,
dataset_name,
check_category,
COUNT(*) as total_checks,
SUM(CASE WHEN passed THEN 1 ELSE 0 END) as passed_checks,
SUM(CASE WHEN NOT passed THEN 1 ELSE 0 END) as failed_checks,
ROUND(
100.0 * SUM(CASE WHEN passed THEN 1 ELSE 0 END) / COUNT(*),
2
) as pass_rate_pct,
AVG(execution_time_ms) as avg_check_time_ms
FROM data_quality.check_results
WHERE check_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2, 3
ORDER BY 1 DESC, 4 ASC;
-- Quality trend analysis
CREATE OR REPLACE VIEW data_quality.trend_analysis AS
WITH daily_metrics AS (
SELECT
date_trunc('day', check_time) as check_date,
dataset_name,
ROUND(
100.0 * SUM(CASE WHEN passed THEN 1 ELSE 0 END) / COUNT(*),
2
) as pass_rate
FROM data_quality.check_results
WHERE check_time >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1, 2
),
with_lag AS (
SELECT
*,
LAG(pass_rate, 7) OVER (
PARTITION BY dataset_name
ORDER BY check_date
) as pass_rate_7d_ago,
LAG(pass_rate, 30) OVER (
PARTITION BY dataset_name
ORDER BY check_date
) as pass_rate_30d_ago
FROM daily_metrics
)
SELECT
check_date,
dataset_name,
pass_rate,
pass_rate - pass_rate_7d_ago as change_7d,
pass_rate - pass_rate_30d_ago as change_30d,
CASE
WHEN pass_rate >= 99 THEN 'excellent'
WHEN pass_rate >= 95 THEN 'good'
WHEN pass_rate >= 90 THEN 'acceptable'
ELSE 'needs_attention'
END as quality_tier
FROM with_lag
WHERE check_date >= CURRENT_DATE - INTERVAL '30 days';
-- Failed checks detail for investigation
CREATE OR REPLACE VIEW data_quality.failed_checks_detail AS
SELECT
check_time,
dataset_name,
check_name,
check_category,
failure_message,
affected_rows,
sample_failing_values,
remediation_suggestion
FROM data_quality.check_results
WHERE NOT passed
AND check_time >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY check_time DESC;
Alerting Configuration
# alerts/quality_alerts.yaml
# Alerting rules for data quality monitoring
alerts:
- name: "critical_quality_failure"
description: "Critical data quality check failed"
condition: |
quality_check.severity == 'critical'
AND quality_check.passed == false
channels:
- type: pagerduty
service: data-platform-critical
- type: slack
channel: "#data-incidents"
runbook_url: "https://runbooks.company.com/quality/critical-failure"
- name: "freshness_sla_breach"
description: "Data freshness SLA violated"
condition: |
data_age_minutes > sla_max_delay_minutes
channels:
- type: slack
channel: "#data-alerts"
- type: email
to: "[email protected]"
runbook_url: "https://runbooks.company.com/quality/freshness-breach"
- name: "quality_degradation"
description: "Significant quality score degradation"
condition: |
current_pass_rate < (historical_pass_rate - 5)
AND current_pass_rate < 95
channels:
- type: slack
channel: "#data-alerts"
runbook_url: "https://runbooks.company.com/quality/degradation"
- name: "anomaly_detected"
description: "Statistical anomaly detected in data"
condition: |
anomaly.severity IN ('high', 'critical')
channels:
- type: slack
channel: "#data-anomalies"
runbook_url: "https://runbooks.company.com/quality/anomaly-investigation"
Key Takeaways
-
Quality is a lifecycle concern: Embed checks at ingestion, transformation, and consumption stages. Prevention is cheaper than cure.
-
Cover all six dimensions: Accuracy, completeness, consistency, timeliness, validity, and uniqueness each require different testing strategies.
-
Combine static and statistical tests: Schema validation catches structural issues; anomaly detection catches drift that static thresholds miss.
-
Establish data contracts: Formalize expectations between producers and consumers. Make quality a shared responsibility.
-
Operationalize quality: Dashboards, alerts, and runbooks transform quality from a development concern to an operational practice.
-
Automate ruthlessly: Manual quality checks don't scale. Every check should run automatically on every data load.
References
- DAMA International. (2017). DAMA-DMBOK: Data Management Body of Knowledge (2nd ed.)
- Redman, T. C. (2001). Data Quality: The Field Guide. Digital Press.
- Great Expectations Documentation: https://docs.greatexpectations.io/
- dbt Tests Documentation: https://docs.getdbt.com/docs/build/tests
- Soda Documentation: https://docs.soda.io/
- Monte Carlo Data Observability: https://www.montecarlodata.com/
Key Takeaways
- ✓Data quality is a lifecycle concern: embed quality checks at ingestion, transformation, and consumption stages
- ✓The six dimensions of data quality (accuracy, completeness, consistency, timeliness, validity, uniqueness) require different testing strategies
- ✓Combine schema tests for structural validation with statistical tests for value-level anomalies
- ✓Automated anomaly detection catches drift that static thresholds miss
- ✓Data quality is a shared responsibility—establish clear ownership through data contracts



