MLOps: From Experimentation to Production
A comprehensive guide to operationalizing machine learning—covering ML pipelines, model versioning, feature stores, A/B testing, and the organizational practices that enable reliable ML systems
Most ML projects fail not because of bad models, but because of bad operations. The gap between a working notebook and a reliable production system is vast—requiring reproducibility, monitoring, and governance that data scientists rarely encounter in experimentation. This guide bridges that gap with practical patterns for production ML.
- Experience with machine learning model development
- Familiarity with Python and ML frameworks (scikit-learn, TensorFlow, PyTorch)
- Basic understanding of software engineering practices

MLOps: From Experimentation to Production
The Production Gap
87% of ML models never make it to production. The reasons are rarely technical—they're operational:
- Models can't be reproduced after the data scientist leaves
- No one knows which features were used or how they were computed
- Training data drifts from production data
- Model performance degrades silently
- Debugging production issues requires recreating months-old experiments
MLOps addresses these gaps by applying software engineering rigor to machine learning.
The MLOps Maturity Model
┌──────────────────────────────────────────────────────────────────────────────┐
│ MLOPS MATURITY LEVELS │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ Level 0: Manual │
│ ────────────────── │
│ • Models trained in notebooks │
│ • Manual export and deployment │
│ • No versioning or reproducibility │
│ • "Works on my machine" problems │
│ │
│ Level 1: ML Pipeline Automation │
│ ────────────────────────────────── │
│ • Automated training pipelines │
│ • Feature and model versioning │
│ • Reproducible experiments │
│ • Manual deployment trigger │
│ │
│ Level 2: CI/CD for ML │
│ ────────────────────── │
│ • Automated testing (data, model, integration) │
│ • Continuous training on new data │
│ • Automated deployment with approval gates │
│ • Model performance monitoring │
│ │
│ Level 3: Full MLOps │
│ ────────────────────── │
│ • Feature stores for feature reuse │
│ • Automated retraining based on drift │
│ • A/B testing and gradual rollouts │
│ • Full lineage and governance │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
ML Pipeline Architecture
End-to-End Pipeline
┌──────────────────────────────────────────────────────────────────────────────┐
│ ML PIPELINE ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DATA LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Raw │ │ Feature │ │ Training │ │ Model │ │ │
│ │ │ Data │─►│ Store │─►│ Data │─►│ Registry │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TRAINING PIPELINE │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Data │ │ Feature │ │ Model │ │ Model │ │ │
│ │ │ Valid- │─►│ Engineer-│─►│ Training │─►│ Evaluat- │ │ │
│ │ │ ation │ │ ing │ │ │ │ ion │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────┐ │ │
│ │ │ Model Registry │ │ │
│ │ │ (MLflow / Vertex AI) │ │ │
│ │ └──────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SERVING PIPELINE │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Model │ │ Serving │ │ A/B │ │ Predict- │ │ │
│ │ │ Loading │─►│ Infra │─►│ Testing │─►│ ion │ │ │
│ │ │ │ │ │ │ │ │ API │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MONITORING LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Data │ │ Model │ │ System │ │ Business │ │ │
│ │ │ Drift │ │ Perf │ │ Metrics │ │ Impact │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Training Pipeline Implementation
# training_pipeline.py
"""
Production ML training pipeline with MLflow integration.
Demonstrates reproducibility, versioning, and validation.
"""
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import (
roc_auc_score, precision_score, recall_score,
f1_score, average_precision_score
)
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from datetime import datetime
import hashlib
import json
@dataclass
class TrainingConfig:
"""Configuration for training run."""
experiment_name: str
model_name: str
target_column: str
feature_columns: List[str]
test_size: float = 0.2
random_state: int = 42
cv_folds: int = 5
min_auc_threshold: float = 0.7
@dataclass
class TrainingResult:
"""Results from a training run."""
run_id: str
model_version: str
metrics: Dict[str, float]
passed_validation: bool
data_hash: str
feature_importance: Dict[str, float]
class MLTrainingPipeline:
"""
Production ML training pipeline.
Handles data validation, training, evaluation, and registration.
"""
def __init__(self, config: TrainingConfig):
self.config = config
self.client = MlflowClient()
# Set up experiment
mlflow.set_experiment(config.experiment_name)
def run(
self,
data: pd.DataFrame,
model_class,
model_params: Dict[str, Any]
) -> TrainingResult:
"""Execute full training pipeline."""
with mlflow.start_run() as run:
run_id = run.info.run_id
# Step 1: Data validation
self._validate_data(data)
# Step 2: Compute data hash for reproducibility
data_hash = self._compute_data_hash(data)
mlflow.log_param("data_hash", data_hash)
# Step 3: Prepare features and target
X = data[self.config.feature_columns]
y = data[self.config.target_column]
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=self.config.test_size,
random_state=self.config.random_state,
stratify=y
)
# Log data statistics
mlflow.log_metrics({
"train_samples": len(X_train),
"test_samples": len(X_test),
"positive_rate_train": y_train.mean(),
"positive_rate_test": y_test.mean(),
})
# Step 4: Log parameters
mlflow.log_params(model_params)
mlflow.log_params({
"model_class": model_class.__name__,
"feature_count": len(self.config.feature_columns),
})
# Step 5: Train model
model = model_class(**model_params)
model.fit(X_train, y_train)
# Step 6: Evaluate
metrics = self._evaluate_model(model, X_test, y_test)
mlflow.log_metrics(metrics)
# Step 7: Cross-validation for stability
cv_scores = cross_val_score(
model, X, y,
cv=self.config.cv_folds,
scoring='roc_auc'
)
mlflow.log_metrics({
"cv_auc_mean": cv_scores.mean(),
"cv_auc_std": cv_scores.std(),
})
# Step 8: Feature importance
feature_importance = self._get_feature_importance(model, X.columns)
mlflow.log_dict(feature_importance, "feature_importance.json")
# Step 9: Validation gate
passed_validation = self._validate_model(metrics, cv_scores)
mlflow.log_param("passed_validation", passed_validation)
# Step 10: Register model if valid
model_version = None
if passed_validation:
# Log model artifact
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=self.config.model_name,
signature=mlflow.models.infer_signature(X_train, model.predict(X_train))
)
# Get version
model_version = self._get_latest_version()
# Tag as candidate
self.client.set_model_version_tag(
self.config.model_name,
model_version,
"stage",
"candidate"
)
return TrainingResult(
run_id=run_id,
model_version=model_version,
metrics=metrics,
passed_validation=passed_validation,
data_hash=data_hash,
feature_importance=feature_importance
)
def _validate_data(self, data: pd.DataFrame) -> None:
"""Validate input data before training."""
# Check for required columns
missing_cols = set(self.config.feature_columns) - set(data.columns)
if missing_cols:
raise ValueError(f"Missing columns: {missing_cols}")
if self.config.target_column not in data.columns:
raise ValueError(f"Target column {self.config.target_column} not found")
# Check for nulls in features
null_counts = data[self.config.feature_columns].isnull().sum()
if null_counts.any():
raise ValueError(f"Null values in features: {null_counts[null_counts > 0].to_dict()}")
# Check target distribution
target_rate = data[self.config.target_column].mean()
if target_rate < 0.01 or target_rate > 0.99:
raise ValueError(f"Target rate {target_rate:.3f} is extreme")
def _compute_data_hash(self, data: pd.DataFrame) -> str:
"""Compute hash of training data for reproducibility."""
data_str = data.to_json(orient='records', date_format='iso')
return hashlib.sha256(data_str.encode()).hexdigest()[:16]
def _evaluate_model(
self,
model,
X_test: pd.DataFrame,
y_test: pd.Series
) -> Dict[str, float]:
"""Evaluate model on test set."""
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
return {
"test_auc": roc_auc_score(y_test, y_prob),
"test_precision": precision_score(y_test, y_pred),
"test_recall": recall_score(y_test, y_pred),
"test_f1": f1_score(y_test, y_pred),
"test_avg_precision": average_precision_score(y_test, y_prob),
}
def _get_feature_importance(
self,
model,
feature_names: List[str]
) -> Dict[str, float]:
"""Extract feature importance from model."""
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
elif hasattr(model, 'coef_'):
importances = np.abs(model.coef_[0])
else:
return {}
return dict(sorted(
zip(feature_names, importances.tolist()),
key=lambda x: x[1],
reverse=True
))
def _validate_model(
self,
metrics: Dict[str, float],
cv_scores: np.ndarray
) -> bool:
"""Apply validation gates for model quality."""
# AUC threshold
if metrics['test_auc'] < self.config.min_auc_threshold:
return False
# CV stability check
if cv_scores.std() > 0.1:
return False
# Overfitting check (CV mean should be close to test)
if abs(cv_scores.mean() - metrics['test_auc']) > 0.05:
return False
return True
def _get_latest_version(self) -> str:
"""Get latest registered model version."""
versions = self.client.get_latest_versions(
self.config.model_name,
stages=["None"]
)
return versions[-1].version if versions else None
Feature Store
Feature stores solve two critical problems: feature reuse and training-serving skew.
# feature_store.py
"""
Feature store implementation for ML feature management.
Ensures consistency between training and serving.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import pandas as pd
import redis
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from feast.types import Float32, Int64, String
@dataclass
class FeatureDefinition:
"""Definition of a feature for the feature store."""
name: str
dtype: str
description: str
entity: str
source_table: str
transformation: Optional[str] = None
ttl_days: int = 90
class ProductionFeatureStore:
"""
Production-ready feature store.
Provides unified interface for feature registration, serving, and monitoring.
"""
def __init__(self, repo_path: str, online_store_config: Dict = None):
self.store = FeatureStore(repo_path=repo_path)
self.online_cache = redis.Redis(**online_store_config) if online_store_config else None
def register_feature_view(
self,
name: str,
entities: List[str],
features: List[FeatureDefinition],
source_path: str,
ttl_days: int = 90
) -> FeatureView:
"""Register a new feature view."""
# Define entities
entity_objects = [
Entity(
name=entity,
join_keys=[f"{entity}_id"],
description=f"Unique identifier for {entity}"
)
for entity in entities
]
# Define features
feature_objects = [
Feature(
name=f.name,
dtype=self._get_feast_type(f.dtype),
)
for f in features
]
# Define source
source = FileSource(
path=source_path,
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Create feature view
feature_view = FeatureView(
name=name,
entities=entity_objects,
ttl=timedelta(days=ttl_days),
features=feature_objects,
source=source,
)
# Apply to feature store
self.store.apply([*entity_objects, feature_view])
return feature_view
def get_training_data(
self,
entity_df: pd.DataFrame,
feature_refs: List[str],
timestamp_column: str = "event_timestamp"
) -> pd.DataFrame:
"""
Get point-in-time correct training data.
Ensures no data leakage by respecting event timestamps.
"""
training_df = self.store.get_historical_features(
entity_df=entity_df,
features=feature_refs,
).to_df()
return training_df
def get_online_features(
self,
entity_keys: Dict[str, List[Any]],
feature_refs: List[str]
) -> Dict[str, List[Any]]:
"""
Get features for online inference.
Low-latency serving from online store.
"""
# Check cache first
cache_key = self._build_cache_key(entity_keys, feature_refs)
if self.online_cache:
cached = self.online_cache.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from Feast
feature_vector = self.store.get_online_features(
features=feature_refs,
entity_rows=[
{k: v[i] for k, v in entity_keys.items()}
for i in range(len(list(entity_keys.values())[0]))
]
).to_dict()
# Cache result
if self.online_cache:
self.online_cache.setex(
cache_key,
timedelta(minutes=5),
json.dumps(feature_vector)
)
return feature_vector
def compute_feature_statistics(
self,
feature_view_name: str,
start_date: datetime,
end_date: datetime
) -> Dict[str, Dict[str, float]]:
"""
Compute statistics for feature monitoring.
Used for drift detection.
"""
# Get feature data
feature_data = self._get_feature_data(
feature_view_name,
start_date,
end_date
)
stats = {}
for column in feature_data.columns:
if feature_data[column].dtype in ['float64', 'int64']:
stats[column] = {
'mean': feature_data[column].mean(),
'std': feature_data[column].std(),
'min': feature_data[column].min(),
'max': feature_data[column].max(),
'null_rate': feature_data[column].isnull().mean(),
'p25': feature_data[column].quantile(0.25),
'p50': feature_data[column].quantile(0.50),
'p75': feature_data[column].quantile(0.75),
}
return stats
def _get_feast_type(self, dtype: str):
"""Map Python types to Feast types."""
type_map = {
'float': Float32,
'int': Int64,
'string': String,
}
return type_map.get(dtype, String)
def _build_cache_key(
self,
entity_keys: Dict[str, List[Any]],
feature_refs: List[str]
) -> str:
"""Build cache key for online features."""
key_str = json.dumps({
'entities': entity_keys,
'features': sorted(feature_refs)
}, sort_keys=True)
return f"features:{hashlib.md5(key_str.encode()).hexdigest()}"
Model Serving
# model_serving.py
"""
Production model serving with A/B testing and monitoring.
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime
import numpy as np
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
from prometheus_client import Counter, Histogram
# Metrics
PREDICTION_COUNTER = Counter(
'model_predictions_total',
'Total predictions',
['model_name', 'model_version']
)
PREDICTION_LATENCY = Histogram(
'model_prediction_latency_seconds',
'Prediction latency',
['model_name']
)
app = FastAPI(title="ML Model Serving API")
class PredictionRequest(BaseModel):
"""Request schema for predictions."""
features: Dict[str, Any]
request_id: Optional[str] = None
class PredictionResponse(BaseModel):
"""Response schema for predictions."""
prediction: float
probability: Optional[float] = None
model_version: str
request_id: str
latency_ms: float
class ABTestConfig(BaseModel):
"""A/B test configuration."""
control_version: str
treatment_version: str
treatment_percentage: float # 0-100
class ModelServer:
"""
Production model server with A/B testing support.
"""
def __init__(self, model_name: str, feature_store: 'ProductionFeatureStore'):
self.model_name = model_name
self.feature_store = feature_store
self.models: Dict[str, Any] = {}
self.ab_config: Optional[ABTestConfig] = None
def load_model(self, version: str) -> None:
"""Load a model version into memory."""
model_uri = f"models:/{self.model_name}/{version}"
self.models[version] = mlflow.pyfunc.load_model(model_uri)
def set_ab_test(self, config: ABTestConfig) -> None:
"""Configure A/B test between model versions."""
# Ensure both versions are loaded
for version in [config.control_version, config.treatment_version]:
if version not in self.models:
self.load_model(version)
self.ab_config = config
def predict(self, request: PredictionRequest) -> PredictionResponse:
"""Make prediction with A/B test routing."""
import time
import uuid
start_time = time.time()
request_id = request.request_id or str(uuid.uuid4())
# Select model version (A/B test or default)
version = self._select_model_version(request_id)
model = self.models[version]
# Enrich features from feature store if needed
features = self._enrich_features(request.features)
# Make prediction
with PREDICTION_LATENCY.labels(model_name=self.model_name).time():
prediction = model.predict(pd.DataFrame([features]))[0]
# Get probability if available
probability = None
if hasattr(model, 'predict_proba'):
probability = float(model.predict_proba(pd.DataFrame([features]))[0, 1])
latency_ms = (time.time() - start_time) * 1000
# Record metrics
PREDICTION_COUNTER.labels(
model_name=self.model_name,
model_version=version
).inc()
# Log for analysis
self._log_prediction(
request_id=request_id,
version=version,
features=features,
prediction=prediction,
probability=probability,
latency_ms=latency_ms
)
return PredictionResponse(
prediction=float(prediction),
probability=probability,
model_version=version,
request_id=request_id,
latency_ms=latency_ms
)
def _select_model_version(self, request_id: str) -> str:
"""Select model version for A/B test."""
if not self.ab_config:
# No A/B test - use latest production version
return max(self.models.keys())
# Deterministic routing based on request_id
hash_value = int(hashlib.md5(request_id.encode()).hexdigest(), 16)
percentage = (hash_value % 100)
if percentage < self.ab_config.treatment_percentage:
return self.ab_config.treatment_version
else:
return self.ab_config.control_version
def _enrich_features(self, features: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich request features with feature store data."""
# Get entity keys from request
entity_keys = {
'customer_id': [features.get('customer_id')]
}
# Fetch online features
if 'customer_id' in features:
store_features = self.feature_store.get_online_features(
entity_keys=entity_keys,
feature_refs=[
"customer_features:total_orders",
"customer_features:lifetime_value",
"customer_features:days_since_last_order",
]
)
features.update({
k: v[0] for k, v in store_features.items()
if k != 'customer_id'
})
return features
def _log_prediction(
self,
request_id: str,
version: str,
features: Dict,
prediction: float,
probability: Optional[float],
latency_ms: float
) -> None:
"""Log prediction for later analysis."""
# In production, send to Kafka or logging system
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'request_id': request_id,
'model_name': self.model_name,
'model_version': version,
'features': features,
'prediction': prediction,
'probability': probability,
'latency_ms': latency_ms
}
# logger.info(json.dumps(log_entry))
Model Monitoring
# model_monitoring.py
"""
Production model monitoring for drift detection and performance tracking.
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from scipy import stats
@dataclass
class DriftAlert:
"""Alert for detected drift."""
drift_type: str # 'data', 'prediction', 'performance'
feature_name: Optional[str]
severity: str # 'low', 'medium', 'high'
current_value: float
baseline_value: float
drift_score: float
detected_at: datetime
recommended_action: str
class ModelMonitor:
"""
Monitors ML models in production for drift and performance degradation.
"""
def __init__(
self,
model_name: str,
baseline_start: datetime,
baseline_end: datetime
):
self.model_name = model_name
self.baseline_start = baseline_start
self.baseline_end = baseline_end
self.baseline_stats: Dict[str, Dict] = {}
def compute_baseline(self, prediction_logs: pd.DataFrame) -> None:
"""Compute baseline statistics from historical predictions."""
baseline_data = prediction_logs[
(prediction_logs['timestamp'] >= self.baseline_start) &
(prediction_logs['timestamp'] <= self.baseline_end)
]
# Feature statistics
for col in baseline_data.columns:
if col.startswith('feature_') and baseline_data[col].dtype in ['float64', 'int64']:
self.baseline_stats[col] = {
'mean': baseline_data[col].mean(),
'std': baseline_data[col].std(),
'min': baseline_data[col].min(),
'max': baseline_data[col].max(),
'distribution': baseline_data[col].values
}
# Prediction statistics
self.baseline_stats['prediction'] = {
'mean': baseline_data['prediction'].mean(),
'std': baseline_data['prediction'].std(),
'distribution': baseline_data['prediction'].values
}
def detect_data_drift(
self,
current_data: pd.DataFrame,
threshold: float = 0.05
) -> List[DriftAlert]:
"""Detect drift in input features using statistical tests."""
alerts = []
for feature_name, baseline in self.baseline_stats.items():
if not feature_name.startswith('feature_'):
continue
if feature_name not in current_data.columns:
continue
current_values = current_data[feature_name].dropna().values
if len(current_values) < 30:
continue
# Kolmogorov-Smirnov test
statistic, p_value = stats.ks_2samp(
baseline['distribution'],
current_values
)
if p_value < threshold:
severity = self._calculate_severity(statistic)
alerts.append(DriftAlert(
drift_type='data',
feature_name=feature_name,
severity=severity,
current_value=np.mean(current_values),
baseline_value=baseline['mean'],
drift_score=statistic,
detected_at=datetime.utcnow(),
recommended_action=self._get_drift_recommendation(severity)
))
return alerts
def detect_prediction_drift(
self,
current_predictions: np.ndarray,
threshold: float = 0.05
) -> Optional[DriftAlert]:
"""Detect drift in model predictions."""
baseline = self.baseline_stats.get('prediction')
if not baseline:
return None
# KS test on predictions
statistic, p_value = stats.ks_2samp(
baseline['distribution'],
current_predictions
)
if p_value < threshold:
severity = self._calculate_severity(statistic)
return DriftAlert(
drift_type='prediction',
feature_name=None,
severity=severity,
current_value=np.mean(current_predictions),
baseline_value=baseline['mean'],
drift_score=statistic,
detected_at=datetime.utcnow(),
recommended_action="Investigate model predictions and consider retraining"
)
return None
def detect_performance_degradation(
self,
actuals: np.ndarray,
predictions: np.ndarray,
baseline_auc: float,
threshold_pct: float = 5.0
) -> Optional[DriftAlert]:
"""Detect degradation in model performance metrics."""
from sklearn.metrics import roc_auc_score
if len(actuals) < 100:
return None
current_auc = roc_auc_score(actuals, predictions)
degradation_pct = ((baseline_auc - current_auc) / baseline_auc) * 100
if degradation_pct > threshold_pct:
severity = 'high' if degradation_pct > 10 else 'medium'
return DriftAlert(
drift_type='performance',
feature_name=None,
severity=severity,
current_value=current_auc,
baseline_value=baseline_auc,
drift_score=degradation_pct,
detected_at=datetime.utcnow(),
recommended_action="Model performance has degraded. Trigger retraining pipeline."
)
return None
def _calculate_severity(self, drift_score: float) -> str:
"""Calculate severity based on drift score."""
if drift_score > 0.3:
return 'high'
elif drift_score > 0.15:
return 'medium'
return 'low'
def _get_drift_recommendation(self, severity: str) -> str:
"""Get recommended action based on severity."""
recommendations = {
'low': "Monitor closely, no immediate action required",
'medium': "Investigate data source and consider retraining",
'high': "Immediate investigation required, consider rolling back to previous model"
}
return recommendations.get(severity, "Unknown severity")
Key Takeaways
-
Start simple, evolve complexity: Begin with Level 1 (automated pipelines), don't jump to Level 3.
-
Reproducibility is non-negotiable: Version data, features, models, and code. Every training run should be reproducible.
-
Feature stores are worth the investment: They eliminate training-serving skew and accelerate model development through feature reuse.
-
Monitor everything: Data drift, prediction drift, and business metrics. Models degrade silently.
-
A/B test before full rollout: Offline metrics don't tell the whole story. Real-world performance often differs.
-
Treat ML as software: Use CI/CD, testing, code review, and production-grade infrastructure.
References
- Sculley, D. et al. (2015). "Hidden Technical Debt in Machine Learning Systems." NIPS.
- Google Cloud. (2023). "MLOps: Continuous Delivery and Automation Pipelines in Machine Learning."
- MLflow Documentation: https://mlflow.org/docs/latest/index.html
- Feast Feature Store: https://docs.feast.dev/
- Evidently AI: ML Monitoring and Testing
- Huyen, C. (2022). Designing Machine Learning Systems. O'Reilly Media.
Key Takeaways
- ✓MLOps is about reliability, not complexity: start with the simplest architecture that meets your needs
- ✓Feature stores solve the training-serving skew problem and accelerate model development
- ✓Model monitoring is as important as model training: production models degrade over time
- ✓A/B testing provides ground truth that offline metrics can't—deploy experiments, not just models
- ✓ML governance (lineage, versioning, access control) becomes critical as ML adoption grows

