Skip to main content
|14 min read|Advanced

MLOps: From Experimentation to Production

A comprehensive guide to operationalizing machine learning—covering ML pipelines, model versioning, feature stores, A/B testing, and the organizational practices that enable reliable ML systems

MLOpsMachine LearningMLflowFeature StoreModel ServingA/B Testing
TL;DR

Most ML projects fail not because of bad models, but because of bad operations. The gap between a working notebook and a reliable production system is vast—requiring reproducibility, monitoring, and governance that data scientists rarely encounter in experimentation. This guide bridges that gap with practical patterns for production ML.

Prerequisites
  • Experience with machine learning model development
  • Familiarity with Python and ML frameworks (scikit-learn, TensorFlow, PyTorch)
  • Basic understanding of software engineering practices
MLOps: From Experimentation to Production

MLOps: From Experimentation to Production

The Production Gap

87% of ML models never make it to production. The reasons are rarely technical—they're operational:

  • Models can't be reproduced after the data scientist leaves
  • No one knows which features were used or how they were computed
  • Training data drifts from production data
  • Model performance degrades silently
  • Debugging production issues requires recreating months-old experiments

MLOps addresses these gaps by applying software engineering rigor to machine learning.

The MLOps Maturity Model

┌──────────────────────────────────────────────────────────────────────────────┐
│                       MLOPS MATURITY LEVELS                                   │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Level 0: Manual                                                             │
│  ──────────────────                                                          │
│  • Models trained in notebooks                                               │
│  • Manual export and deployment                                              │
│  • No versioning or reproducibility                                          │
│  • "Works on my machine" problems                                            │
│                                                                              │
│  Level 1: ML Pipeline Automation                                             │
│  ──────────────────────────────────                                          │
│  • Automated training pipelines                                              │
│  • Feature and model versioning                                              │
│  • Reproducible experiments                                                  │
│  • Manual deployment trigger                                                 │
│                                                                              │
│  Level 2: CI/CD for ML                                                       │
│  ──────────────────────                                                      │
│  • Automated testing (data, model, integration)                              │
│  • Continuous training on new data                                           │
│  • Automated deployment with approval gates                                  │
│  • Model performance monitoring                                              │
│                                                                              │
│  Level 3: Full MLOps                                                         │
│  ──────────────────────                                                      │
│  • Feature stores for feature reuse                                          │
│  • Automated retraining based on drift                                       │
│  • A/B testing and gradual rollouts                                          │
│  • Full lineage and governance                                               │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

ML Pipeline Architecture

End-to-End Pipeline

┌──────────────────────────────────────────────────────────────────────────────┐
│                        ML PIPELINE ARCHITECTURE                               │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                       DATA LAYER                                     │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │    │
│  │  │  Raw     │  │ Feature  │  │ Training │  │  Model   │            │    │
│  │  │  Data    │─►│  Store   │─►│  Data    │─►│ Registry │            │    │
│  │  │          │  │          │  │          │  │          │            │    │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                     TRAINING PIPELINE                                │    │
│  │                                                                      │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │    │
│  │  │  Data    │  │ Feature  │  │  Model   │  │  Model   │            │    │
│  │  │  Valid-  │─►│ Engineer-│─►│ Training │─►│ Evaluat- │            │    │
│  │  │  ation   │  │  ing     │  │          │  │  ion     │            │    │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │    │
│  │                                                │                     │    │
│  │                                                ▼                     │    │
│  │                              ┌──────────────────────────┐           │    │
│  │                              │    Model Registry        │           │    │
│  │                              │    (MLflow / Vertex AI)  │           │    │
│  │                              └──────────────────────────┘           │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                     SERVING PIPELINE                                 │    │
│  │                                                                      │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │    │
│  │  │  Model   │  │  Serving │  │   A/B    │  │ Predict- │            │    │
│  │  │  Loading │─►│  Infra   │─►│  Testing │─►│  ion     │            │    │
│  │  │          │  │          │  │          │  │  API     │            │    │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                     MONITORING LAYER                                 │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │    │
│  │  │  Data    │  │  Model   │  │  System  │  │ Business │            │    │
│  │  │  Drift   │  │  Perf    │  │  Metrics │  │  Impact  │            │    │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Training Pipeline Implementation

# training_pipeline.py
"""
Production ML training pipeline with MLflow integration.
Demonstrates reproducibility, versioning, and validation.
"""

import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score,
    f1_score, average_precision_score
)
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from datetime import datetime
import hashlib
import json

@dataclass
class TrainingConfig:
    """Configuration for training run."""
    experiment_name: str
    model_name: str
    target_column: str
    feature_columns: List[str]
    test_size: float = 0.2
    random_state: int = 42
    cv_folds: int = 5
    min_auc_threshold: float = 0.7


@dataclass
class TrainingResult:
    """Results from a training run."""
    run_id: str
    model_version: str
    metrics: Dict[str, float]
    passed_validation: bool
    data_hash: str
    feature_importance: Dict[str, float]


class MLTrainingPipeline:
    """
    Production ML training pipeline.
    Handles data validation, training, evaluation, and registration.
    """

    def __init__(self, config: TrainingConfig):
        self.config = config
        self.client = MlflowClient()

        # Set up experiment
        mlflow.set_experiment(config.experiment_name)

    def run(
        self,
        data: pd.DataFrame,
        model_class,
        model_params: Dict[str, Any]
    ) -> TrainingResult:
        """Execute full training pipeline."""

        with mlflow.start_run() as run:
            run_id = run.info.run_id

            # Step 1: Data validation
            self._validate_data(data)

            # Step 2: Compute data hash for reproducibility
            data_hash = self._compute_data_hash(data)
            mlflow.log_param("data_hash", data_hash)

            # Step 3: Prepare features and target
            X = data[self.config.feature_columns]
            y = data[self.config.target_column]

            X_train, X_test, y_train, y_test = train_test_split(
                X, y,
                test_size=self.config.test_size,
                random_state=self.config.random_state,
                stratify=y
            )

            # Log data statistics
            mlflow.log_metrics({
                "train_samples": len(X_train),
                "test_samples": len(X_test),
                "positive_rate_train": y_train.mean(),
                "positive_rate_test": y_test.mean(),
            })

            # Step 4: Log parameters
            mlflow.log_params(model_params)
            mlflow.log_params({
                "model_class": model_class.__name__,
                "feature_count": len(self.config.feature_columns),
            })

            # Step 5: Train model
            model = model_class(**model_params)
            model.fit(X_train, y_train)

            # Step 6: Evaluate
            metrics = self._evaluate_model(model, X_test, y_test)
            mlflow.log_metrics(metrics)

            # Step 7: Cross-validation for stability
            cv_scores = cross_val_score(
                model, X, y,
                cv=self.config.cv_folds,
                scoring='roc_auc'
            )
            mlflow.log_metrics({
                "cv_auc_mean": cv_scores.mean(),
                "cv_auc_std": cv_scores.std(),
            })

            # Step 8: Feature importance
            feature_importance = self._get_feature_importance(model, X.columns)
            mlflow.log_dict(feature_importance, "feature_importance.json")

            # Step 9: Validation gate
            passed_validation = self._validate_model(metrics, cv_scores)
            mlflow.log_param("passed_validation", passed_validation)

            # Step 10: Register model if valid
            model_version = None
            if passed_validation:
                # Log model artifact
                mlflow.sklearn.log_model(
                    model,
                    artifact_path="model",
                    registered_model_name=self.config.model_name,
                    signature=mlflow.models.infer_signature(X_train, model.predict(X_train))
                )

                # Get version
                model_version = self._get_latest_version()

                # Tag as candidate
                self.client.set_model_version_tag(
                    self.config.model_name,
                    model_version,
                    "stage",
                    "candidate"
                )

            return TrainingResult(
                run_id=run_id,
                model_version=model_version,
                metrics=metrics,
                passed_validation=passed_validation,
                data_hash=data_hash,
                feature_importance=feature_importance
            )

    def _validate_data(self, data: pd.DataFrame) -> None:
        """Validate input data before training."""
        # Check for required columns
        missing_cols = set(self.config.feature_columns) - set(data.columns)
        if missing_cols:
            raise ValueError(f"Missing columns: {missing_cols}")

        if self.config.target_column not in data.columns:
            raise ValueError(f"Target column {self.config.target_column} not found")

        # Check for nulls in features
        null_counts = data[self.config.feature_columns].isnull().sum()
        if null_counts.any():
            raise ValueError(f"Null values in features: {null_counts[null_counts > 0].to_dict()}")

        # Check target distribution
        target_rate = data[self.config.target_column].mean()
        if target_rate < 0.01 or target_rate > 0.99:
            raise ValueError(f"Target rate {target_rate:.3f} is extreme")

    def _compute_data_hash(self, data: pd.DataFrame) -> str:
        """Compute hash of training data for reproducibility."""
        data_str = data.to_json(orient='records', date_format='iso')
        return hashlib.sha256(data_str.encode()).hexdigest()[:16]

    def _evaluate_model(
        self,
        model,
        X_test: pd.DataFrame,
        y_test: pd.Series
    ) -> Dict[str, float]:
        """Evaluate model on test set."""
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]

        return {
            "test_auc": roc_auc_score(y_test, y_prob),
            "test_precision": precision_score(y_test, y_pred),
            "test_recall": recall_score(y_test, y_pred),
            "test_f1": f1_score(y_test, y_pred),
            "test_avg_precision": average_precision_score(y_test, y_prob),
        }

    def _get_feature_importance(
        self,
        model,
        feature_names: List[str]
    ) -> Dict[str, float]:
        """Extract feature importance from model."""
        if hasattr(model, 'feature_importances_'):
            importances = model.feature_importances_
        elif hasattr(model, 'coef_'):
            importances = np.abs(model.coef_[0])
        else:
            return {}

        return dict(sorted(
            zip(feature_names, importances.tolist()),
            key=lambda x: x[1],
            reverse=True
        ))

    def _validate_model(
        self,
        metrics: Dict[str, float],
        cv_scores: np.ndarray
    ) -> bool:
        """Apply validation gates for model quality."""
        # AUC threshold
        if metrics['test_auc'] < self.config.min_auc_threshold:
            return False

        # CV stability check
        if cv_scores.std() > 0.1:
            return False

        # Overfitting check (CV mean should be close to test)
        if abs(cv_scores.mean() - metrics['test_auc']) > 0.05:
            return False

        return True

    def _get_latest_version(self) -> str:
        """Get latest registered model version."""
        versions = self.client.get_latest_versions(
            self.config.model_name,
            stages=["None"]
        )
        return versions[-1].version if versions else None

Feature Store

Feature stores solve two critical problems: feature reuse and training-serving skew.

# feature_store.py
"""
Feature store implementation for ML feature management.
Ensures consistency between training and serving.
"""

from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import pandas as pd
import redis
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.types import Float32, Int64, String

@dataclass
class FeatureDefinition:
    """Definition of a feature for the feature store."""
    name: str
    dtype: str
    description: str
    entity: str
    source_table: str
    transformation: Optional[str] = None
    ttl_days: int = 90


class ProductionFeatureStore:
    """
    Production-ready feature store.
    Provides unified interface for feature registration, serving, and monitoring.
    """

    def __init__(self, repo_path: str, online_store_config: Dict = None):
        self.store = FeatureStore(repo_path=repo_path)
        self.online_cache = redis.Redis(**online_store_config) if online_store_config else None

    def register_feature_view(
        self,
        name: str,
        entities: List[str],
        features: List[FeatureDefinition],
        source_path: str,
        ttl_days: int = 90
    ) -> FeatureView:
        """Register a new feature view."""

        # Define entities
        entity_objects = [
            Entity(
                name=entity,
                join_keys=[f"{entity}_id"],
                description=f"Unique identifier for {entity}"
            )
            for entity in entities
        ]

        # Define features
        feature_objects = [
            Feature(
                name=f.name,
                dtype=self._get_feast_type(f.dtype),
            )
            for f in features
        ]

        # Define source
        source = FileSource(
            path=source_path,
            timestamp_field="event_timestamp",
            created_timestamp_column="created_timestamp",
        )

        # Create feature view
        feature_view = FeatureView(
            name=name,
            entities=entity_objects,
            ttl=timedelta(days=ttl_days),
            features=feature_objects,
            source=source,
        )

        # Apply to feature store
        self.store.apply([*entity_objects, feature_view])

        return feature_view

    def get_training_data(
        self,
        entity_df: pd.DataFrame,
        feature_refs: List[str],
        timestamp_column: str = "event_timestamp"
    ) -> pd.DataFrame:
        """
        Get point-in-time correct training data.
        Ensures no data leakage by respecting event timestamps.
        """
        training_df = self.store.get_historical_features(
            entity_df=entity_df,
            features=feature_refs,
        ).to_df()

        return training_df

    def get_online_features(
        self,
        entity_keys: Dict[str, List[Any]],
        feature_refs: List[str]
    ) -> Dict[str, List[Any]]:
        """
        Get features for online inference.
        Low-latency serving from online store.
        """
        # Check cache first
        cache_key = self._build_cache_key(entity_keys, feature_refs)
        if self.online_cache:
            cached = self.online_cache.get(cache_key)
            if cached:
                return json.loads(cached)

        # Fetch from Feast
        feature_vector = self.store.get_online_features(
            features=feature_refs,
            entity_rows=[
                {k: v[i] for k, v in entity_keys.items()}
                for i in range(len(list(entity_keys.values())[0]))
            ]
        ).to_dict()

        # Cache result
        if self.online_cache:
            self.online_cache.setex(
                cache_key,
                timedelta(minutes=5),
                json.dumps(feature_vector)
            )

        return feature_vector

    def compute_feature_statistics(
        self,
        feature_view_name: str,
        start_date: datetime,
        end_date: datetime
    ) -> Dict[str, Dict[str, float]]:
        """
        Compute statistics for feature monitoring.
        Used for drift detection.
        """
        # Get feature data
        feature_data = self._get_feature_data(
            feature_view_name,
            start_date,
            end_date
        )

        stats = {}
        for column in feature_data.columns:
            if feature_data[column].dtype in ['float64', 'int64']:
                stats[column] = {
                    'mean': feature_data[column].mean(),
                    'std': feature_data[column].std(),
                    'min': feature_data[column].min(),
                    'max': feature_data[column].max(),
                    'null_rate': feature_data[column].isnull().mean(),
                    'p25': feature_data[column].quantile(0.25),
                    'p50': feature_data[column].quantile(0.50),
                    'p75': feature_data[column].quantile(0.75),
                }

        return stats

    def _get_feast_type(self, dtype: str):
        """Map Python types to Feast types."""
        type_map = {
            'float': Float32,
            'int': Int64,
            'string': String,
        }
        return type_map.get(dtype, String)

    def _build_cache_key(
        self,
        entity_keys: Dict[str, List[Any]],
        feature_refs: List[str]
    ) -> str:
        """Build cache key for online features."""
        key_str = json.dumps({
            'entities': entity_keys,
            'features': sorted(feature_refs)
        }, sort_keys=True)
        return f"features:{hashlib.md5(key_str.encode()).hexdigest()}"

Model Serving

# model_serving.py
"""
Production model serving with A/B testing and monitoring.
"""

from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime
import numpy as np
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
from prometheus_client import Counter, Histogram

# Metrics
PREDICTION_COUNTER = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model_name', 'model_version']
)
PREDICTION_LATENCY = Histogram(
    'model_prediction_latency_seconds',
    'Prediction latency',
    ['model_name']
)

app = FastAPI(title="ML Model Serving API")


class PredictionRequest(BaseModel):
    """Request schema for predictions."""
    features: Dict[str, Any]
    request_id: Optional[str] = None


class PredictionResponse(BaseModel):
    """Response schema for predictions."""
    prediction: float
    probability: Optional[float] = None
    model_version: str
    request_id: str
    latency_ms: float


class ABTestConfig(BaseModel):
    """A/B test configuration."""
    control_version: str
    treatment_version: str
    treatment_percentage: float  # 0-100


class ModelServer:
    """
    Production model server with A/B testing support.
    """

    def __init__(self, model_name: str, feature_store: 'ProductionFeatureStore'):
        self.model_name = model_name
        self.feature_store = feature_store
        self.models: Dict[str, Any] = {}
        self.ab_config: Optional[ABTestConfig] = None

    def load_model(self, version: str) -> None:
        """Load a model version into memory."""
        model_uri = f"models:/{self.model_name}/{version}"
        self.models[version] = mlflow.pyfunc.load_model(model_uri)

    def set_ab_test(self, config: ABTestConfig) -> None:
        """Configure A/B test between model versions."""
        # Ensure both versions are loaded
        for version in [config.control_version, config.treatment_version]:
            if version not in self.models:
                self.load_model(version)

        self.ab_config = config

    def predict(self, request: PredictionRequest) -> PredictionResponse:
        """Make prediction with A/B test routing."""
        import time
        import uuid

        start_time = time.time()
        request_id = request.request_id or str(uuid.uuid4())

        # Select model version (A/B test or default)
        version = self._select_model_version(request_id)
        model = self.models[version]

        # Enrich features from feature store if needed
        features = self._enrich_features(request.features)

        # Make prediction
        with PREDICTION_LATENCY.labels(model_name=self.model_name).time():
            prediction = model.predict(pd.DataFrame([features]))[0]

        # Get probability if available
        probability = None
        if hasattr(model, 'predict_proba'):
            probability = float(model.predict_proba(pd.DataFrame([features]))[0, 1])

        latency_ms = (time.time() - start_time) * 1000

        # Record metrics
        PREDICTION_COUNTER.labels(
            model_name=self.model_name,
            model_version=version
        ).inc()

        # Log for analysis
        self._log_prediction(
            request_id=request_id,
            version=version,
            features=features,
            prediction=prediction,
            probability=probability,
            latency_ms=latency_ms
        )

        return PredictionResponse(
            prediction=float(prediction),
            probability=probability,
            model_version=version,
            request_id=request_id,
            latency_ms=latency_ms
        )

    def _select_model_version(self, request_id: str) -> str:
        """Select model version for A/B test."""
        if not self.ab_config:
            # No A/B test - use latest production version
            return max(self.models.keys())

        # Deterministic routing based on request_id
        hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
        percentage = (hash_value % 100)

        if percentage < self.ab_config.treatment_percentage:
            return self.ab_config.treatment_version
        else:
            return self.ab_config.control_version

    def _enrich_features(self, features: Dict[str, Any]) -> Dict[str, Any]:
        """Enrich request features with feature store data."""
        # Get entity keys from request
        entity_keys = {
            'customer_id': [features.get('customer_id')]
        }

        # Fetch online features
        if 'customer_id' in features:
            store_features = self.feature_store.get_online_features(
                entity_keys=entity_keys,
                feature_refs=[
                    "customer_features:total_orders",
                    "customer_features:lifetime_value",
                    "customer_features:days_since_last_order",
                ]
            )
            features.update({
                k: v[0] for k, v in store_features.items()
                if k != 'customer_id'
            })

        return features

    def _log_prediction(
        self,
        request_id: str,
        version: str,
        features: Dict,
        prediction: float,
        probability: Optional[float],
        latency_ms: float
    ) -> None:
        """Log prediction for later analysis."""
        # In production, send to Kafka or logging system
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'request_id': request_id,
            'model_name': self.model_name,
            'model_version': version,
            'features': features,
            'prediction': prediction,
            'probability': probability,
            'latency_ms': latency_ms
        }
        # logger.info(json.dumps(log_entry))

Model Monitoring

# model_monitoring.py
"""
Production model monitoring for drift detection and performance tracking.
"""

from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from scipy import stats

@dataclass
class DriftAlert:
    """Alert for detected drift."""
    drift_type: str  # 'data', 'prediction', 'performance'
    feature_name: Optional[str]
    severity: str  # 'low', 'medium', 'high'
    current_value: float
    baseline_value: float
    drift_score: float
    detected_at: datetime
    recommended_action: str


class ModelMonitor:
    """
    Monitors ML models in production for drift and performance degradation.
    """

    def __init__(
        self,
        model_name: str,
        baseline_start: datetime,
        baseline_end: datetime
    ):
        self.model_name = model_name
        self.baseline_start = baseline_start
        self.baseline_end = baseline_end
        self.baseline_stats: Dict[str, Dict] = {}

    def compute_baseline(self, prediction_logs: pd.DataFrame) -> None:
        """Compute baseline statistics from historical predictions."""
        baseline_data = prediction_logs[
            (prediction_logs['timestamp'] >= self.baseline_start) &
            (prediction_logs['timestamp'] <= self.baseline_end)
        ]

        # Feature statistics
        for col in baseline_data.columns:
            if col.startswith('feature_') and baseline_data[col].dtype in ['float64', 'int64']:
                self.baseline_stats[col] = {
                    'mean': baseline_data[col].mean(),
                    'std': baseline_data[col].std(),
                    'min': baseline_data[col].min(),
                    'max': baseline_data[col].max(),
                    'distribution': baseline_data[col].values
                }

        # Prediction statistics
        self.baseline_stats['prediction'] = {
            'mean': baseline_data['prediction'].mean(),
            'std': baseline_data['prediction'].std(),
            'distribution': baseline_data['prediction'].values
        }

    def detect_data_drift(
        self,
        current_data: pd.DataFrame,
        threshold: float = 0.05
    ) -> List[DriftAlert]:
        """Detect drift in input features using statistical tests."""
        alerts = []

        for feature_name, baseline in self.baseline_stats.items():
            if not feature_name.startswith('feature_'):
                continue

            if feature_name not in current_data.columns:
                continue

            current_values = current_data[feature_name].dropna().values

            if len(current_values) < 30:
                continue

            # Kolmogorov-Smirnov test
            statistic, p_value = stats.ks_2samp(
                baseline['distribution'],
                current_values
            )

            if p_value < threshold:
                severity = self._calculate_severity(statistic)

                alerts.append(DriftAlert(
                    drift_type='data',
                    feature_name=feature_name,
                    severity=severity,
                    current_value=np.mean(current_values),
                    baseline_value=baseline['mean'],
                    drift_score=statistic,
                    detected_at=datetime.utcnow(),
                    recommended_action=self._get_drift_recommendation(severity)
                ))

        return alerts

    def detect_prediction_drift(
        self,
        current_predictions: np.ndarray,
        threshold: float = 0.05
    ) -> Optional[DriftAlert]:
        """Detect drift in model predictions."""
        baseline = self.baseline_stats.get('prediction')
        if not baseline:
            return None

        # KS test on predictions
        statistic, p_value = stats.ks_2samp(
            baseline['distribution'],
            current_predictions
        )

        if p_value < threshold:
            severity = self._calculate_severity(statistic)

            return DriftAlert(
                drift_type='prediction',
                feature_name=None,
                severity=severity,
                current_value=np.mean(current_predictions),
                baseline_value=baseline['mean'],
                drift_score=statistic,
                detected_at=datetime.utcnow(),
                recommended_action="Investigate model predictions and consider retraining"
            )

        return None

    def detect_performance_degradation(
        self,
        actuals: np.ndarray,
        predictions: np.ndarray,
        baseline_auc: float,
        threshold_pct: float = 5.0
    ) -> Optional[DriftAlert]:
        """Detect degradation in model performance metrics."""
        from sklearn.metrics import roc_auc_score

        if len(actuals) < 100:
            return None

        current_auc = roc_auc_score(actuals, predictions)
        degradation_pct = ((baseline_auc - current_auc) / baseline_auc) * 100

        if degradation_pct > threshold_pct:
            severity = 'high' if degradation_pct > 10 else 'medium'

            return DriftAlert(
                drift_type='performance',
                feature_name=None,
                severity=severity,
                current_value=current_auc,
                baseline_value=baseline_auc,
                drift_score=degradation_pct,
                detected_at=datetime.utcnow(),
                recommended_action="Model performance has degraded. Trigger retraining pipeline."
            )

        return None

    def _calculate_severity(self, drift_score: float) -> str:
        """Calculate severity based on drift score."""
        if drift_score > 0.3:
            return 'high'
        elif drift_score > 0.15:
            return 'medium'
        return 'low'

    def _get_drift_recommendation(self, severity: str) -> str:
        """Get recommended action based on severity."""
        recommendations = {
            'low': "Monitor closely, no immediate action required",
            'medium': "Investigate data source and consider retraining",
            'high': "Immediate investigation required, consider rolling back to previous model"
        }
        return recommendations.get(severity, "Unknown severity")

Key Takeaways

  1. Start simple, evolve complexity: Begin with Level 1 (automated pipelines), don't jump to Level 3.

  2. Reproducibility is non-negotiable: Version data, features, models, and code. Every training run should be reproducible.

  3. Feature stores are worth the investment: They eliminate training-serving skew and accelerate model development through feature reuse.

  4. Monitor everything: Data drift, prediction drift, and business metrics. Models degrade silently.

  5. A/B test before full rollout: Offline metrics don't tell the whole story. Real-world performance often differs.

  6. Treat ML as software: Use CI/CD, testing, code review, and production-grade infrastructure.

References

  1. Sculley, D. et al. (2015). "Hidden Technical Debt in Machine Learning Systems." NIPS.
  2. Google Cloud. (2023). "MLOps: Continuous Delivery and Automation Pipelines in Machine Learning."
  3. MLflow Documentation: https://mlflow.org/docs/latest/index.html
  4. Feast Feature Store: https://docs.feast.dev/
  5. Evidently AI: ML Monitoring and Testing
  6. Huyen, C. (2022). Designing Machine Learning Systems. O'Reilly Media.

Key Takeaways

  • MLOps is about reliability, not complexity: start with the simplest architecture that meets your needs
  • Feature stores solve the training-serving skew problem and accelerate model development
  • Model monitoring is as important as model training: production models degrade over time
  • A/B testing provides ground truth that offline metrics can't—deploy experiments, not just models
  • ML governance (lineage, versioning, access control) becomes critical as ML adoption grows
Gemut Analytics Team
Gemut Analytics Team
Data Engineering Experts