Telecom Analytics Platform
Enterprise-scale analytics platform processing 50B+ daily events for network optimization, customer intelligence, and predictive maintenance for a Tier-1 telecommunications provider

Executive Summary
Designed and implemented a unified analytics platform processing 50+ billion daily events from 75,000+ cell towers and 45 million subscribers. The platform enables real-time network health monitoring, predictive maintenance, customer churn prediction, and revenue assurance—delivering $47M in annual value through reduced churn, optimized capex, and improved network reliability.
The Challenge
The client operated legacy analytics systems built on traditional data warehouses that couldn't keep pace with exploding data volumes from 4G/5G network expansion. Network operations relied on reactive monitoring, customer analytics lagged by 48+ hours, and data silos prevented cross-functional insights. The existing infrastructure processed only 5% of available network telemetry.
Telecom Analytics Platform
Executive Summary
This case study details our implementation of an enterprise-scale analytics platform for a Tier-1 telecommunications provider serving 45 million subscribers across a network of 75,000+ cell towers. The platform processes over 50 billion daily events, enabling real-time network optimization, predictive equipment maintenance, customer experience management, and revenue assurance.
The project delivered $47M in annual value with an 11-month payback period, while establishing a foundation for 5G analytics capabilities and AI-driven network automation.
The Challenge
Industry Context
The telecommunications industry generates some of the largest data volumes of any sector. A modern telecom network produces:
- Network telemetry: Performance metrics from every cell tower, router, and switch (millisecond granularity)
- Call Detail Records (CDRs): Metadata for every call, text, and data session
- Customer interactions: App usage, support tickets, billing events, retail visits
- External signals: Weather, events, population movement, competitive intelligence
Our client—a Tier-1 provider with 45 million subscribers—was processing less than 5% of this available data, leaving massive value unrealized.
Specific Challenges
┌────────────────────────────────────────────────────────────────────────────┐
│ LEGACY STATE ASSESSMENT │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ DATA SILOS PROCESSING LIMITATIONS │
│ ─────────── ───────────────────── │
│ • Network ops: Separate Oracle DB • 48+ hour analytics latency │
│ • Customer: Teradata warehouse • 5% telemetry sampling only │
│ • Billing: Legacy mainframe • Batch-only processing │
│ • Retail: Salesforce + Excel • No cross-domain correlation │
│ │
│ OPERATIONAL GAPS BUSINESS IMPACT │
│ ─────────────── ─────────────── │
│ • Reactive network monitoring • $38M annual churn losses │
│ • Manual anomaly detection • 4.2h mean time to repair │
│ • No predictive maintenance • Suboptimal capex allocation │
│ • Limited customer insights • Revenue leakage undetected │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Requirements Analysis
| Requirement Category | Specification | Business Driver |
|---|---|---|
| Data Volume | 50B+ events/day | Full telemetry capture |
| Latency - Critical | <1 second | Network fault detection |
| Latency - Analytics | <5 minutes | Customer experience scoring |
| Retention | 7 years raw, indefinite aggregated | Regulatory compliance |
| Availability | 99.99% | Network operations dependency |
| Scalability | 10x headroom | 5G expansion trajectory |
| Security | SOC 2 Type II, GDPR, CCPA | Customer data protection |
Solution Architecture
High-Level Design
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ TELECOM ANALYTICS PLATFORM │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ DATA SOURCES │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Network │ │ CDR/XDR │ │ Customer │ │ External │ │ │
│ │ │ Telemetry │ │ Records │ │ Systems │ │ Feeds │ │ │
│ │ │ (SNMP,gRPC) │ │ (SS7/IP) │ │ (CRM,Billing)│ │ (Weather,Geo)│ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ └─────────┼────────────────┼────────────────┼────────────────┼────────────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ INGESTION LAYER (Kafka) │ │
│ │ ┌────────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ network.telemetry │ cdr.voice │ cdr.data │ customer.events │ │ │
│ │ │ 500K msg/sec │ 100K/sec │ 200K/sec │ 50K/sec │ │ │
│ │ └────────────────────────────────────────────────────────────────────────┘ │ │
│ │ 7-day retention, 3x replication │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────┼────────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────┐ ┌─────────────────┐ │
│ │ REAL-TIME PATH │ │ NEAR REAL-TIME │ │ BATCH PATH │ │
│ │ Spark Streaming │ │ Spark Streaming │ │ Spark Batch │ │
│ │ (<1 sec) │ │ (1-5 minutes) │ │ (hourly/daily) │ │
│ │ │ │ │ │ │ │
│ │ • Fault detect │ │ • KPI aggregation │ │ • Historical │ │
│ │ • Threshold │ │ • Experience score │ │ analysis │ │
│ │ alerts │ │ • Churn signals │ │ • Model training│ │
│ │ • Anomaly flag │ │ • Usage patterns │ │ • Reporting │ │
│ └────────┬────────┘ └─────────┬───────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ DELTA LAKE (Unified Storage) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Bronze │ │ Silver │ │ Gold │ │ Platinum │ │ │
│ │ │ (Raw events)│ │ (Cleaned) │ │(Aggregated) │ │ (Features) │ │ │
│ │ │ 45 PB │ │ 12 PB │ │ 2 PB │ │ 500 TB │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────┼────────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────┐ ┌─────────────────┐ │
│ │ ML SERVING │ │ ANALYTICS LAYER │ │ OPERATIONAL │ │
│ │ (MLflow) │ │ (Databricks SQL) │ │ (Grafana) │ │
│ │ │ │ │ │ │ │
│ │ • Churn models │ │ • Business intel │ │ • NOC dashboards│ │
│ │ • Maintenance │ │ • Ad-hoc analysis │ │ • Alerting │ │
│ │ • Experience │ │ • Semantic layer │ │ • SLA monitoring│ │
│ └─────────────────┘ └─────────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
Data Ingestion Layer
The ingestion layer handles diverse telecom data sources with varying protocols and volumes.
Network Telemetry Collector
# network_telemetry_collector.py
"""
High-throughput network telemetry collector for cell tower metrics.
Handles SNMP, gRPC, and streaming telemetry protocols.
"""
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
from confluent_kafka import Producer
import grpc
from prometheus_client import Counter, Histogram
# Metrics
EVENTS_INGESTED = Counter(
'telemetry_events_total',
'Total telemetry events ingested',
['source_type', 'cell_region']
)
INGESTION_LATENCY = Histogram(
'telemetry_ingestion_latency_seconds',
'Telemetry ingestion latency',
buckets=[.001, .005, .01, .025, .05, .1, .25, .5, 1.0]
)
@dataclass
class CellTowerMetrics:
"""Normalized cell tower telemetry model."""
cell_id: str
timestamp: datetime
latitude: float
longitude: float
sector: int # 0-2 for 3-sector sites
technology: str # '4G_LTE', '5G_NR', 'LEGACY'
# Radio metrics
active_users: int
prb_utilization: float # Physical Resource Block utilization (0-1)
average_cqi: float # Channel Quality Indicator (1-15)
average_rsrp: float # Reference Signal Received Power (dBm)
average_rsrq: float # Reference Signal Received Quality (dB)
throughput_dl_mbps: float
throughput_ul_mbps: float
# Capacity metrics
max_capacity_mbps: float
available_capacity_pct: float
handover_success_rate: float
call_drop_rate: float
# Infrastructure metrics
cpu_utilization: float
memory_utilization: float
temperature_celsius: float
power_consumption_watts: float
backhaul_utilization: float
# Derived fields
health_score: float = field(init=False)
congestion_risk: str = field(init=False)
def __post_init__(self):
"""Calculate derived metrics."""
self.health_score = self._calculate_health_score()
self.congestion_risk = self._assess_congestion_risk()
def _calculate_health_score(self) -> float:
"""
Composite health score (0-100) based on weighted KPIs.
Weights derived from impact analysis on customer experience.
"""
weights = {
'prb': 0.25, # Capacity utilization
'cqi': 0.20, # Signal quality
'drop': 0.25, # Call drops (inverted)
'handover': 0.15, # Mobility success
'infra': 0.15 # Infrastructure health
}
scores = {
'prb': max(0, 100 - (self.prb_utilization * 100)),
'cqi': (self.average_cqi / 15) * 100,
'drop': max(0, 100 - (self.call_drop_rate * 1000)), # 1% drop = -10 points
'handover': self.handover_success_rate * 100,
'infra': 100 - max(self.cpu_utilization, self.memory_utilization) * 100
}
return sum(weights[k] * scores[k] for k in weights)
def _assess_congestion_risk(self) -> str:
"""Classify congestion risk level."""
if self.prb_utilization > 0.85 or self.available_capacity_pct < 10:
return 'CRITICAL'
elif self.prb_utilization > 0.70 or self.available_capacity_pct < 25:
return 'HIGH'
elif self.prb_utilization > 0.50:
return 'MEDIUM'
return 'LOW'
class TelemetryIngestionPipeline:
"""
Unified ingestion pipeline for multi-protocol telemetry.
Normalizes diverse vendor formats into canonical schema.
"""
def __init__(
self,
kafka_config: Dict,
cell_registry_url: str,
batch_size: int = 1000,
flush_interval_ms: int = 100
):
self.producer = Producer({
**kafka_config,
'batch.size': batch_size,
'linger.ms': flush_interval_ms,
'compression.type': 'lz4',
'acks': 'all',
'enable.idempotence': True
})
self.cell_registry = CellRegistryClient(cell_registry_url)
self.vendor_parsers = self._init_vendor_parsers()
def _init_vendor_parsers(self) -> Dict:
"""Initialize vendor-specific telemetry parsers."""
return {
'ericsson': EricssonTelemetryParser(),
'nokia': NokiaTelemetryParser(),
'samsung': SamsungTelemetryParser(),
'huawei': HuaweiTelemetryParser()
}
async def process_telemetry_batch(
self,
raw_events: List[bytes],
vendor: str,
region: str
) -> int:
"""
Process batch of raw telemetry events.
Returns count of successfully processed events.
"""
parser = self.vendor_parsers.get(vendor)
if not parser:
raise ValueError(f"Unknown vendor: {vendor}")
processed = 0
for raw_event in raw_events:
try:
with INGESTION_LATENCY.time():
# Parse vendor-specific format
normalized = parser.parse(raw_event)
# Enrich with cell registry metadata
cell_meta = await self.cell_registry.get_cell(
normalized.cell_id
)
normalized.latitude = cell_meta.latitude
normalized.longitude = cell_meta.longitude
normalized.sector = cell_meta.sector
# Calculate derived metrics
metrics = CellTowerMetrics(**normalized.__dict__)
# Route to appropriate Kafka topic based on priority
topic = self._route_event(metrics)
self.producer.produce(
topic=topic,
key=metrics.cell_id.encode(),
value=metrics.to_avro(),
callback=self._delivery_callback
)
processed += 1
EVENTS_INGESTED.labels(
source_type='telemetry',
cell_region=region
).inc()
except Exception as e:
# Log but don't fail batch for individual errors
logger.warning(f"Failed to process event: {e}")
self.producer.flush()
return processed
def _route_event(self, metrics: CellTowerMetrics) -> str:
"""Route events to topics based on criticality."""
if metrics.congestion_risk == 'CRITICAL' or metrics.health_score < 50:
return 'network.telemetry.critical'
elif metrics.health_score < 70:
return 'network.telemetry.warning'
return 'network.telemetry.normal'
Real-Time Stream Processing
The platform implements a multi-tier streaming architecture for different latency requirements.
Sub-Second Anomaly Detection
# realtime_anomaly_detection.py
"""
Spark Structured Streaming job for real-time network anomaly detection.
Implements statistical and ML-based detection with sub-second latency.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, window, avg, stddev, count, when, lit,
from_json, to_json, struct, expr, udf
)
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType,
TimestampType, IntegerType, BooleanType
)
from pyspark.ml.feature import VectorAssembler
import mlflow.pyfunc
# Initialize Spark with optimizations for streaming
spark = SparkSession.builder \
.appName("NetworkAnomalyDetection") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.streaming.kafka.maxRatePerPartition", "50000") \
.getOrCreate()
# Schema for incoming telemetry
telemetry_schema = StructType([
StructField("cell_id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("prb_utilization", DoubleType(), False),
StructField("average_cqi", DoubleType(), False),
StructField("call_drop_rate", DoubleType(), False),
StructField("handover_success_rate", DoubleType(), False),
StructField("throughput_dl_mbps", DoubleType(), False),
StructField("active_users", IntegerType(), False),
StructField("health_score", DoubleType(), False),
StructField("congestion_risk", StringType(), False),
StructField("latitude", DoubleType(), False),
StructField("longitude", DoubleType(), False)
])
# Load ML model for advanced anomaly detection
anomaly_model = mlflow.pyfunc.load_model("models:/network_anomaly_detector/Production")
@udf(returnType=DoubleType())
def predict_anomaly_score(features):
"""UDF wrapper for ML model inference."""
return float(anomaly_model.predict([features])[0])
class NetworkAnomalyDetector:
"""
Multi-method anomaly detection for network telemetry.
Combines statistical baselines with ML predictions.
"""
def __init__(self, kafka_bootstrap_servers: str, checkpoint_location: str):
self.kafka_servers = kafka_bootstrap_servers
self.checkpoint_location = checkpoint_location
def build_streaming_pipeline(self):
"""Construct the anomaly detection streaming pipeline."""
# Read from Kafka with all priority topics
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", self.kafka_servers) \
.option("subscribePattern", "network.telemetry.*") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 500000) \
.option("kafka.group.id", "anomaly-detection-v1") \
.load()
# Parse and validate events
parsed_stream = raw_stream \
.select(
from_json(col("value").cast("string"), telemetry_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
) \
.select("data.*", "kafka_timestamp") \
.withWatermark("timestamp", "30 seconds")
# Method 1: Statistical Anomaly Detection
# Calculate rolling statistics per cell over 5-minute windows
cell_baselines = parsed_stream \
.groupBy(
col("cell_id"),
window(col("timestamp"), "5 minutes", "1 minute")
) \
.agg(
avg("prb_utilization").alias("avg_prb"),
stddev("prb_utilization").alias("std_prb"),
avg("call_drop_rate").alias("avg_drop"),
stddev("call_drop_rate").alias("std_drop"),
avg("health_score").alias("avg_health"),
count("*").alias("sample_count")
)
# Method 2: Topology-Aware Anomaly Detection
# Compare cells to their geographic neighbors
neighbor_comparison = self._add_neighbor_context(parsed_stream)
# Method 3: ML-Based Detection
# Assemble features for ML model
feature_cols = [
"prb_utilization", "average_cqi", "call_drop_rate",
"handover_success_rate", "throughput_dl_mbps", "active_users"
]
with_features = parsed_stream \
.withColumn(
"ml_features",
VectorAssembler(inputCols=feature_cols, outputCol="features")
.transform(parsed_stream)
.select("features")
) \
.withColumn("anomaly_score", predict_anomaly_score(col("ml_features")))
# Combine detection methods and classify
final_stream = with_features \
.join(cell_baselines, ["cell_id"], "left") \
.withColumn(
"statistical_anomaly",
# Z-score > 3 for key metrics
when(
(col("prb_utilization") > col("avg_prb") + 3 * col("std_prb")) |
(col("call_drop_rate") > col("avg_drop") + 3 * col("std_drop")),
True
).otherwise(False)
) \
.withColumn(
"ml_anomaly",
col("anomaly_score") > 0.85
) \
.withColumn(
"anomaly_type",
when(
col("statistical_anomaly") & col("ml_anomaly"),
lit("CONFIRMED") # Both methods agree
).when(
col("ml_anomaly"),
lit("ML_DETECTED")
).when(
col("statistical_anomaly"),
lit("STATISTICAL")
).otherwise(lit("NORMAL"))
) \
.withColumn(
"severity",
when(col("health_score") < 30, lit("CRITICAL"))
.when(col("health_score") < 50, lit("MAJOR"))
.when(col("health_score") < 70, lit("MINOR"))
.otherwise(lit("WARNING"))
)
return final_stream
def _add_neighbor_context(self, stream):
"""
Enrich stream with neighbor cell context for topology-aware detection.
Anomalies affecting multiple adjacent cells indicate infrastructure issues.
"""
# Load cell adjacency graph (precomputed from handover patterns)
adjacency_df = spark.read.table("gold.cell_adjacency_graph")
# 1-minute aggregates per cell
cell_state = stream \
.groupBy(
col("cell_id"),
window(col("timestamp"), "1 minute")
) \
.agg(
avg("health_score").alias("cell_health"),
avg("call_drop_rate").alias("cell_drop_rate")
)
# Join with neighbors and compute regional health
with_neighbors = cell_state \
.join(adjacency_df, cell_state.cell_id == adjacency_df.source_cell) \
.join(
cell_state.alias("neighbor"),
adjacency_df.target_cell == col("neighbor.cell_id")
) \
.groupBy("cell_id") \
.agg(
avg("neighbor.cell_health").alias("neighbor_avg_health"),
count(when(col("neighbor.cell_health") < 50, True)).alias("degraded_neighbors")
)
return stream.join(with_neighbors, "cell_id", "left")
def start_detection(self):
"""Start the streaming detection pipeline."""
detection_stream = self.build_streaming_pipeline()
# Write anomalies to multiple sinks
# Sink 1: Delta Lake for analytics
delta_query = detection_stream \
.filter(col("anomaly_type") != "NORMAL") \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"{self.checkpoint_location}/delta") \
.option("mergeSchema", "true") \
.table("silver.network_anomalies")
# Sink 2: Kafka for real-time alerting (critical only)
alert_query = detection_stream \
.filter(
(col("anomaly_type") == "CONFIRMED") &
(col("severity").isin("CRITICAL", "MAJOR"))
) \
.select(
col("cell_id").alias("key"),
to_json(struct("*")).alias("value")
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", self.kafka_servers) \
.option("topic", "network.alerts.realtime") \
.option("checkpointLocation", f"{self.checkpoint_location}/kafka") \
.start()
return delta_query, alert_query
Customer 360° and Churn Prediction
The platform builds a unified customer view combining network experience with behavioral signals.
Customer Feature Engineering
# customer_feature_engineering.py
"""
Feature engineering for customer analytics and churn prediction.
Builds 360° customer view from network, billing, and behavioral data.
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
col, sum, avg, count, datediff, current_date,
percentile_approx, collect_list, size, when, lit,
expr, months_between, max as spark_max, min as spark_min
)
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from typing import List, Dict
spark = SparkSession.builder.getOrCreate()
class CustomerFeatureStore:
"""
Feature store for customer analytics.
Computes and serves features for churn prediction, experience scoring, and segmentation.
"""
# Feature groups with computation logic
FEATURE_DEFINITIONS = {
'network_experience': {
'avg_signal_quality_30d': 'Average RSRP over last 30 days',
'dropped_call_rate_30d': 'Call drop rate over last 30 days',
'data_throughput_p50_30d': 'Median data throughput',
'coverage_score': 'Weighted coverage quality at home/work locations',
'network_complaint_count_90d': 'Network-related complaints in 90 days'
},
'usage_patterns': {
'voice_minutes_trend': 'Voice usage month-over-month change',
'data_usage_gb_30d': 'Data consumption last 30 days',
'roaming_days_90d': 'Days with roaming usage',
'peak_hour_usage_pct': 'Percentage of usage during peak hours',
'app_diversity_score': 'Diversity of apps used (entropy)'
},
'engagement': {
'app_sessions_30d': 'Mobile app sessions in 30 days',
'self_service_transactions': 'Self-service vs agent transactions ratio',
'nps_score_latest': 'Most recent NPS score',
'support_tickets_90d': 'Support tickets opened in 90 days',
'payment_method': 'Auto-pay vs manual payment'
},
'financial': {
'arpu_3m_avg': 'Average revenue per user (3 months)',
'arpu_trend': 'ARPU month-over-month trend',
'late_payments_12m': 'Late payment count in 12 months',
'plan_tenure_days': 'Days on current plan',
'total_tenure_days': 'Total customer tenure'
},
'churn_signals': {
'competitor_search_count': 'Searches for competitor brands',
'contract_end_days': 'Days until contract end',
'recent_plan_downgrade': 'Downgraded plan in last 90 days',
'support_escalation_flag': 'Had escalated support in 90 days',
'data_usage_decline_pct': 'Decline in data usage vs prior period'
}
}
def __init__(self, feature_table: str = "platinum.customer_features"):
self.feature_table = feature_table
self.computation_date = datetime.now().date()
def compute_network_experience_features(self) -> DataFrame:
"""
Compute network experience features from CDR and telemetry data.
Links customer sessions to cell tower quality metrics.
"""
# Customer session to cell tower mapping
session_quality = spark.sql("""
WITH customer_sessions AS (
SELECT
customer_id,
cell_id,
session_start,
session_end,
data_volume_mb,
voice_duration_sec
FROM silver.customer_sessions
WHERE session_date >= current_date() - INTERVAL 30 DAYS
),
cell_quality AS (
SELECT
cell_id,
timestamp,
average_rsrp,
average_cqi,
call_drop_rate,
throughput_dl_mbps
FROM silver.cell_tower_metrics
WHERE metric_date >= current_date() - INTERVAL 30 DAYS
)
SELECT
cs.customer_id,
AVG(cq.average_rsrp) as avg_signal_quality_30d,
AVG(cq.call_drop_rate) as dropped_call_rate_30d,
PERCENTILE(cq.throughput_dl_mbps, 0.5) as data_throughput_p50_30d,
SUM(cs.data_volume_mb) as total_data_mb_30d,
COUNT(DISTINCT cs.cell_id) as unique_cells_used
FROM customer_sessions cs
JOIN cell_quality cq
ON cs.cell_id = cq.cell_id
AND cq.timestamp BETWEEN cs.session_start AND cs.session_end
GROUP BY cs.customer_id
""")
# Add home/work location coverage scores
location_quality = self._compute_location_coverage_scores()
return session_quality.join(location_quality, "customer_id", "left")
def _compute_location_coverage_scores(self) -> DataFrame:
"""
Compute coverage quality at customer's frequent locations.
Uses clustering on location data to identify home/work.
"""
return spark.sql("""
WITH location_clusters AS (
SELECT
customer_id,
location_cluster_id,
cluster_type, -- 'home', 'work', 'other'
avg_time_spent_hours,
primary_cell_id
FROM gold.customer_location_clusters
),
cell_reliability AS (
SELECT
cell_id,
AVG(health_score) as avg_health,
PERCENTILE(call_drop_rate, 0.95) as p95_drop_rate
FROM silver.cell_tower_metrics
WHERE metric_date >= current_date() - INTERVAL 90 DAYS
GROUP BY cell_id
)
SELECT
lc.customer_id,
-- Weighted coverage score (home 50%, work 35%, other 15%)
SUM(
CASE lc.cluster_type
WHEN 'home' THEN 0.50
WHEN 'work' THEN 0.35
ELSE 0.15
END * cr.avg_health
) as coverage_score,
MAX(CASE WHEN lc.cluster_type = 'home' THEN cr.avg_health END) as home_coverage,
MAX(CASE WHEN lc.cluster_type = 'work' THEN cr.avg_health END) as work_coverage
FROM location_clusters lc
JOIN cell_reliability cr ON lc.primary_cell_id = cr.cell_id
GROUP BY lc.customer_id
""")
def compute_churn_probability(self, features_df: DataFrame) -> DataFrame:
"""
Apply churn prediction model to customer features.
Uses gradient boosting model trained on historical churn.
"""
import mlflow.pyfunc
# Load production churn model
churn_model = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_predictor/Production",
result_type="double"
)
# Feature columns for model
feature_cols = [
"avg_signal_quality_30d", "dropped_call_rate_30d",
"data_throughput_p50_30d", "coverage_score",
"arpu_3m_avg", "arpu_trend", "total_tenure_days",
"support_tickets_90d", "nps_score_latest",
"contract_end_days", "data_usage_decline_pct"
]
return features_df \
.withColumn(
"churn_probability",
churn_model(struct([col(c) for c in feature_cols]))
) \
.withColumn(
"churn_risk_tier",
when(col("churn_probability") > 0.7, lit("HIGH"))
.when(col("churn_probability") > 0.4, lit("MEDIUM"))
.otherwise(lit("LOW"))
) \
.withColumn(
"retention_priority_score",
# Prioritize high-value customers at risk
col("churn_probability") * col("arpu_3m_avg") / 100
)
Churn Prediction Model
# churn_prediction_model.py
"""
XGBoost-based customer churn prediction model.
Trained on 24-month historical data with SHAP explanations.
"""
import xgboost as xgb
import shap
import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import (
roc_auc_score, precision_recall_curve, average_precision_score
)
import mlflow
import mlflow.xgboost
class TelecomChurnModel:
"""
Production churn prediction model for telecom subscribers.
Optimized for imbalanced classes (~2% monthly churn rate).
"""
def __init__(self, experiment_name: str = "customer_churn_prediction"):
mlflow.set_experiment(experiment_name)
self.model = None
self.explainer = None
self.feature_importance = None
# Model configuration optimized for churn prediction
self.model_params = {
'objective': 'binary:logistic',
'eval_metric': ['auc', 'aucpr'], # AUC-PR better for imbalanced
'max_depth': 6,
'learning_rate': 0.05,
'n_estimators': 500,
'min_child_weight': 10,
'subsample': 0.8,
'colsample_bytree': 0.8,
'scale_pos_weight': 45, # ~2.2% churn rate adjustment
'reg_alpha': 0.1,
'reg_lambda': 1.0,
'tree_method': 'hist',
'random_state': 42
}
def train(
self,
train_df: pd.DataFrame,
feature_cols: list,
target_col: str = 'churned_30d',
date_col: str = 'snapshot_date'
) -> dict:
"""
Train churn model with time-based cross-validation.
Uses walk-forward validation to prevent temporal leakage.
"""
with mlflow.start_run(run_name=f"churn_model_{pd.Timestamp.now().strftime('%Y%m%d')}"):
# Log parameters
mlflow.log_params(self.model_params)
mlflow.log_param("n_features", len(feature_cols))
mlflow.log_param("training_samples", len(train_df))
# Time-based cross-validation
tscv = TimeSeriesSplit(n_splits=5, gap=30) # 30-day gap
cv_scores = []
X = train_df[feature_cols]
y = train_df[target_col]
dates = train_df[date_col]
for fold, (train_idx, val_idx) in enumerate(tscv.split(X, groups=dates)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
model = xgb.XGBClassifier(**self.model_params)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=False
)
val_pred = model.predict_proba(X_val)[:, 1]
fold_auc = roc_auc_score(y_val, val_pred)
fold_aucpr = average_precision_score(y_val, val_pred)
cv_scores.append({
'fold': fold,
'auc': fold_auc,
'aucpr': fold_aucpr,
'val_size': len(val_idx),
'churn_rate': y_val.mean()
})
mlflow.log_metric(f"fold_{fold}_auc", fold_auc)
mlflow.log_metric(f"fold_{fold}_aucpr", fold_aucpr)
# Train final model on full data
self.model = xgb.XGBClassifier(**self.model_params)
self.model.fit(X, y, verbose=False)
# Compute SHAP values for explainability
self.explainer = shap.TreeExplainer(self.model)
sample_shap = self.explainer.shap_values(X.sample(min(1000, len(X))))
# Feature importance
self.feature_importance = pd.DataFrame({
'feature': feature_cols,
'importance': self.model.feature_importances_,
'shap_mean': np.abs(sample_shap).mean(axis=0)
}).sort_values('shap_mean', ascending=False)
# Log artifacts
mlflow.log_table(self.feature_importance, "feature_importance.json")
mlflow.xgboost.log_model(
self.model,
artifact_path="model",
registered_model_name="customer_churn_predictor"
)
# Summary metrics
avg_auc = np.mean([s['auc'] for s in cv_scores])
avg_aucpr = np.mean([s['aucpr'] for s in cv_scores])
mlflow.log_metric("cv_mean_auc", avg_auc)
mlflow.log_metric("cv_mean_aucpr", avg_aucpr)
return {
'cv_scores': cv_scores,
'feature_importance': self.feature_importance,
'avg_auc': avg_auc,
'avg_aucpr': avg_aucpr
}
def explain_prediction(
self,
customer_features: pd.DataFrame,
top_n: int = 5
) -> dict:
"""
Generate SHAP-based explanation for individual prediction.
Returns top contributing factors to churn risk.
"""
if self.explainer is None:
raise ValueError("Model not trained or explainer not initialized")
shap_values = self.explainer.shap_values(customer_features)
base_value = self.explainer.expected_value
# Identify top positive and negative contributors
feature_impacts = pd.DataFrame({
'feature': customer_features.columns,
'value': customer_features.values[0],
'shap_value': shap_values[0]
}).sort_values('shap_value', key=abs, ascending=False)
risk_factors = feature_impacts[feature_impacts['shap_value'] > 0].head(top_n)
protective_factors = feature_impacts[feature_impacts['shap_value'] < 0].head(top_n)
return {
'prediction': float(self.model.predict_proba(customer_features)[0, 1]),
'base_rate': float(1 / (1 + np.exp(-base_value))),
'risk_factors': risk_factors.to_dict('records'),
'protective_factors': protective_factors.to_dict('records'),
'total_shap_impact': float(shap_values[0].sum())
}
Predictive Maintenance System
The platform predicts equipment failures 72 hours in advance, enabling proactive maintenance.
Equipment Health Scoring
# predictive_maintenance.py
"""
Predictive maintenance system for network infrastructure.
Multi-horizon failure prediction for proactive maintenance scheduling.
"""
from typing import Dict, List, Tuple
import pandas as pd
import numpy as np
from dataclasses import dataclass
from datetime import datetime, timedelta
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
Input, LSTM, Dense, Dropout, Concatenate,
BatchNormalization, Attention
)
@dataclass
class MaintenancePrediction:
"""Structured maintenance prediction output."""
equipment_id: str
equipment_type: str
location: str
# Failure probabilities at different horizons
failure_prob_24h: float
failure_prob_48h: float
failure_prob_72h: float
# Predicted failure mode
likely_failure_mode: str
failure_mode_confidence: float
# Recommended actions
recommended_action: str
priority_score: float
estimated_downtime_hours: float
# Impact assessment
affected_subscribers: int
revenue_at_risk: float
class NetworkMaintenancePredictor:
"""
Multi-horizon failure prediction for telecom network equipment.
Combines LSTM for temporal patterns with attention for feature importance.
"""
# Equipment types and their sensor configurations
EQUIPMENT_CONFIGS = {
'cell_tower': {
'sensors': ['temperature', 'power_consumption', 'cpu_util',
'memory_util', 'signal_quality', 'error_rate'],
'sequence_length': 168, # 7 days of hourly data
'critical_thresholds': {
'temperature': 85, # Celsius
'error_rate': 0.05
}
},
'router': {
'sensors': ['cpu_util', 'memory_util', 'packet_loss',
'latency', 'throughput_utilization'],
'sequence_length': 336, # 14 days
'critical_thresholds': {
'packet_loss': 0.01,
'latency': 100 # ms
}
},
'power_system': {
'sensors': ['voltage', 'current', 'power_factor',
'temperature', 'battery_health'],
'sequence_length': 720, # 30 days
'critical_thresholds': {
'battery_health': 0.7,
'power_factor': 0.85
}
}
}
# Failure modes by equipment type
FAILURE_MODES = {
'cell_tower': [
'power_amplifier_degradation',
'antenna_misalignment',
'cooling_system_failure',
'software_fault',
'backhaul_degradation'
],
'router': [
'memory_leak',
'port_failure',
'software_corruption',
'overheating',
'power_supply_degradation'
]
}
def __init__(self, model_path: str = None):
self.models: Dict[str, Model] = {}
if model_path:
self._load_models(model_path)
def build_prediction_model(
self,
equipment_type: str,
n_sensors: int
) -> Model:
"""
Build LSTM-Attention model for failure prediction.
Architecture designed for multi-horizon prediction.
"""
config = self.EQUIPMENT_CONFIGS[equipment_type]
seq_length = config['sequence_length']
# Input layers
sensor_input = Input(shape=(seq_length, n_sensors), name='sensor_sequence')
static_input = Input(shape=(10,), name='static_features') # equipment metadata
# LSTM layers with attention
lstm_1 = LSTM(128, return_sequences=True)(sensor_input)
lstm_1 = BatchNormalization()(lstm_1)
lstm_1 = Dropout(0.3)(lstm_1)
lstm_2 = LSTM(64, return_sequences=True)(lstm_1)
lstm_2 = BatchNormalization()(lstm_2)
# Self-attention for temporal importance
attention = Attention()([lstm_2, lstm_2])
lstm_final = LSTM(32)(attention)
# Combine with static features
combined = Concatenate()([lstm_final, static_input])
dense_1 = Dense(64, activation='relu')(combined)
dense_1 = Dropout(0.3)(dense_1)
dense_2 = Dense(32, activation='relu')(dense_1)
# Multi-horizon outputs
output_24h = Dense(1, activation='sigmoid', name='failure_24h')(dense_2)
output_48h = Dense(1, activation='sigmoid', name='failure_48h')(dense_2)
output_72h = Dense(1, activation='sigmoid', name='failure_72h')(dense_2)
# Failure mode classification
n_failure_modes = len(self.FAILURE_MODES.get(equipment_type, ['unknown']))
failure_mode = Dense(
n_failure_modes,
activation='softmax',
name='failure_mode'
)(dense_2)
model = Model(
inputs=[sensor_input, static_input],
outputs=[output_24h, output_48h, output_72h, failure_mode]
)
model.compile(
optimizer='adam',
loss={
'failure_24h': 'binary_crossentropy',
'failure_48h': 'binary_crossentropy',
'failure_72h': 'binary_crossentropy',
'failure_mode': 'categorical_crossentropy'
},
loss_weights={
'failure_24h': 2.0, # Weight near-term more heavily
'failure_48h': 1.5,
'failure_72h': 1.0,
'failure_mode': 0.5
},
metrics=['accuracy', 'AUC']
)
return model
def predict_maintenance_needs(
self,
equipment_data: pd.DataFrame,
equipment_type: str
) -> List[MaintenancePrediction]:
"""
Generate maintenance predictions for equipment fleet.
Returns prioritized maintenance recommendations.
"""
model = self.models[equipment_type]
config = self.EQUIPMENT_CONFIGS[equipment_type]
predictions = []
for equipment_id, data in equipment_data.groupby('equipment_id'):
# Prepare sequences
sensor_cols = config['sensors']
sensor_sequence = data[sensor_cols].values[-config['sequence_length']:]
# Pad if needed
if len(sensor_sequence) < config['sequence_length']:
padding = np.zeros((
config['sequence_length'] - len(sensor_sequence),
len(sensor_cols)
))
sensor_sequence = np.vstack([padding, sensor_sequence])
sensor_sequence = sensor_sequence.reshape(1, -1, len(sensor_cols))
# Static features
static_features = self._extract_static_features(data.iloc[-1])
# Predict
prob_24h, prob_48h, prob_72h, failure_mode = model.predict(
[sensor_sequence, static_features]
)
# Decode failure mode
failure_modes = self.FAILURE_MODES.get(equipment_type, ['unknown'])
mode_idx = np.argmax(failure_mode[0])
likely_mode = failure_modes[mode_idx]
mode_confidence = float(failure_mode[0][mode_idx])
# Calculate priority and recommendations
max_prob = max(prob_24h[0][0], prob_48h[0][0], prob_72h[0][0])
impact = self._calculate_impact(data.iloc[-1])
priority = max_prob * impact['revenue_at_risk'] / 10000
predictions.append(MaintenancePrediction(
equipment_id=equipment_id,
equipment_type=equipment_type,
location=data.iloc[-1]['location'],
failure_prob_24h=float(prob_24h[0][0]),
failure_prob_48h=float(prob_48h[0][0]),
failure_prob_72h=float(prob_72h[0][0]),
likely_failure_mode=likely_mode,
failure_mode_confidence=mode_confidence,
recommended_action=self._recommend_action(
max_prob, likely_mode, mode_confidence
),
priority_score=priority,
estimated_downtime_hours=self._estimate_downtime(likely_mode),
affected_subscribers=impact['affected_subscribers'],
revenue_at_risk=impact['revenue_at_risk']
))
# Sort by priority
predictions.sort(key=lambda x: x.priority_score, reverse=True)
return predictions
def _recommend_action(
self,
failure_prob: float,
failure_mode: str,
confidence: float
) -> str:
"""Generate maintenance recommendation based on prediction."""
if failure_prob > 0.8:
return f"URGENT: Schedule immediate preventive maintenance for {failure_mode}"
elif failure_prob > 0.5:
return f"SCHEDULE: Plan maintenance within 48 hours for potential {failure_mode}"
elif failure_prob > 0.3:
return f"MONITOR: Increase monitoring frequency, prepare for {failure_mode}"
return "ROUTINE: Continue standard monitoring schedule"
def _estimate_downtime(self, failure_mode: str) -> float:
"""Estimate downtime hours based on failure mode."""
downtime_estimates = {
'power_amplifier_degradation': 4.0,
'antenna_misalignment': 2.5,
'cooling_system_failure': 3.0,
'software_fault': 1.0,
'backhaul_degradation': 2.0,
'memory_leak': 0.5,
'port_failure': 1.5,
'software_corruption': 1.0,
'overheating': 2.0,
'power_supply_degradation': 3.5
}
return downtime_estimates.get(failure_mode, 2.0)
def _calculate_impact(self, equipment_record: pd.Series) -> Dict:
"""Calculate business impact of potential failure."""
# Subscribers affected based on equipment type and location
subscribers = equipment_record.get('active_subscribers', 1000)
arpu = 45 # Average revenue per user per month
# Daily revenue at risk
daily_revenue = (subscribers * arpu / 30)
return {
'affected_subscribers': subscribers,
'revenue_at_risk': daily_revenue
}
Network Operations Dashboard
Real-time operational visibility for Network Operations Center (NOC).
Grafana Dashboard Configuration
# grafana/dashboards/network-operations.yaml
apiVersion: 1
providers:
- name: 'Network Operations'
orgId: 1
folder: 'NOC'
type: file
disableDeletion: false
editable: true
options:
path: /var/lib/grafana/dashboards/noc
# Dashboard JSON embedded
dashboards:
- title: "Network Health Overview"
uid: "network-health-001"
tags: ["noc", "network", "real-time"]
refresh: "5s"
variables:
- name: region
type: query
query: "SELECT DISTINCT region FROM network.cell_towers"
multi: true
- name: technology
type: custom
options: ["4G_LTE", "5G_NR", "ALL"]
panels:
- title: "Network Health Score"
type: gauge
gridPos: { x: 0, y: 0, w: 6, h: 8 }
targets:
- datasource: "TimescaleDB"
rawSql: |
SELECT
AVG(health_score) as value
FROM network.cell_tower_metrics
WHERE time > NOW() - INTERVAL '5 minutes'
AND region IN ($region)
ORDER BY time DESC
LIMIT 1
fieldConfig:
defaults:
thresholds:
mode: absolute
steps:
- color: red
value: 0
- color: orange
value: 60
- color: yellow
value: 75
- color: green
value: 90
- title: "Active Anomalies by Severity"
type: stat
gridPos: { x: 6, y: 0, w: 6, h: 4 }
targets:
- datasource: "TimescaleDB"
rawSql: |
SELECT
severity,
COUNT(*) as count
FROM network.active_anomalies
WHERE status = 'OPEN'
GROUP BY severity
ORDER BY
CASE severity
WHEN 'CRITICAL' THEN 1
WHEN 'MAJOR' THEN 2
ELSE 3
END
options:
colorMode: background
reduceOptions:
calcs: ["last"]
- title: "Real-time Traffic Volume"
type: timeseries
gridPos: { x: 0, y: 8, w: 12, h: 8 }
targets:
- datasource: "TimescaleDB"
rawSql: |
SELECT
time_bucket('1 minute', time) AS time,
SUM(throughput_dl_mbps) as "Downlink (Mbps)",
SUM(throughput_ul_mbps) as "Uplink (Mbps)"
FROM network.cell_tower_metrics
WHERE time > NOW() - INTERVAL '1 hour'
AND region IN ($region)
GROUP BY time_bucket('1 minute', time)
ORDER BY time
fieldConfig:
defaults:
unit: "Mbps"
custom:
fillOpacity: 20
lineWidth: 2
- title: "Predictive Maintenance Alerts"
type: table
gridPos: { x: 12, y: 0, w: 12, h: 12 }
targets:
- datasource: "TimescaleDB"
rawSql: |
SELECT
equipment_id,
location,
ROUND(failure_prob_24h * 100, 1) || '%' as "24h Risk",
ROUND(failure_prob_72h * 100, 1) || '%' as "72h Risk",
likely_failure_mode as "Predicted Issue",
recommended_action as "Recommendation",
affected_subscribers as "Subscribers",
'$' || ROUND(revenue_at_risk::numeric, 0) as "Daily Revenue at Risk"
FROM maintenance.predictions
WHERE failure_prob_72h > 0.3
ORDER BY priority_score DESC
LIMIT 20
transformations:
- id: sortBy
options:
sort:
- field: "24h Risk"
desc: true
Results and Impact
Quantified Outcomes
| Metric | Before | After | Improvement |
|---|---|---|---|
| Churn Rate | 2.1% monthly | 1.62% monthly | 23% reduction |
| Network Downtime | 4.2h MTTR | 2.7h MTTR | 35% reduction |
| Data Processing | 2.5B events/day | 50B+ events/day | 20x increase |
| Analytics Latency | 48+ hours | <5 minutes | 576x faster |
| Predictive Accuracy | N/A | 94.2% | New capability |
| Platform Availability | 97.5% | 99.97% | Near-perfect |
Financial Impact Breakdown
┌────────────────────────────────────────────────────────────────────────────┐
│ ANNUAL VALUE: $47M │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ CHURN REDUCTION $18M │
│ ───────────────────────────────────────────────────────── │
│ • 0.48% reduction × 45M subscribers × $45 ARPU × 12 months │
│ • Proactive retention saved 216,000 subscribers │
│ │
│ CAPEX OPTIMIZATION $15M │
│ ───────────────────────────────────────────────────────── │
│ • Data-driven capacity planning vs over-provisioning │
│ • Targeted 5G deployment based on demand prediction │
│ • Reduced emergency equipment purchases by 60% │
│ │
│ OPERATIONAL EFFICIENCY $9M │
│ ───────────────────────────────────────────────────────── │
│ • Predictive maintenance reduced truck rolls by 40% │
│ • MTTR reduction: 4.2h → 2.7h (35% improvement) │
│ • Automated anomaly detection: 85% of issues auto-triaged │
│ │
│ REVENUE ASSURANCE $5M │
│ ───────────────────────────────────────────────────────── │
│ • CDR reconciliation identified billing discrepancies │
│ • Fraud detection prevented $3.2M in losses │
│ • Usage pattern analysis recovered unbilled services │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Platform Performance
| Component | Specification | Achieved |
|---|---|---|
| Kafka Cluster | 24 brokers, 3 AZs | 850K msg/sec sustained |
| Spark Streaming | 500 executors | Sub-second processing |
| Delta Lake | 60 PB capacity | 45 PB utilized |
| ML Serving | <50ms p99 latency | 23ms p99 achieved |
| Query Performance | <10s for complex queries | 3.2s average |
Technical Innovations
1. Topology-Aware Anomaly Detection
Traditional threshold-based monitoring missed cascade failures and regional issues. Our topology-aware approach:
- Builds dynamic cell adjacency graphs from handover patterns
- Correlates anomalies across geographic clusters
- Identifies root cause vs. symptom with 89% accuracy
- Reduces false positives by 67%
2. Multi-Horizon Forecasting Architecture
Different prediction windows require different model architectures:
| Horizon | Architecture | Primary Use Case |
|---|---|---|
| Nowcast (<5 min) | Streaming rules + simple ML | Real-time alerting |
| Short-term (1-72h) | LSTM with attention | Maintenance scheduling |
| Long-term (7-90 days) | Prophet + gradient boosting | Capacity planning |
3. Feature Store for Telecom
Centralized feature computation reduced ML development time by 60%:
- 500+ pre-computed features
- Point-in-time correctness for training
- Sub-millisecond serving latency
- Automated feature monitoring and drift detection
Key Learnings
-
Unified streaming architecture with Kafka and Spark Structured Streaming enables processing 50B+ daily events while maintaining sub-second latency for critical alerting paths.
-
Delta Lake's ACID transactions and time travel capabilities are essential for telecom's regulatory compliance requirements, enabling precise audit trails and data corrections.
-
Network topology-aware analytics provides 3x better anomaly detection than point-based monitoring by understanding cell relationships and handover patterns.
-
Feature stores dramatically accelerate ML development—our shared feature platform reduced model development cycles from 8 weeks to 3 weeks.
-
Multi-horizon forecasting requires separate architectures optimized for each prediction window, from real-time rules to long-term Prophet models.
Conclusion
This platform transformed the client's ability to leverage data for network operations, customer experience, and strategic planning. The 11-month ROI payback demonstrates that modern data architecture investments in telecom deliver rapid, measurable value.
The foundation now supports advanced use cases including AI-driven network automation, 5G slice optimization, and real-time personalization—positioning the client for continued competitive advantage.
Measurable Impact
$47M
Annual Value
ROI achieved in 11 months23%
Churn Reduction
From 2.1% to 1.62% monthly-35%
Network Downtime
MTTR reduced from 4.2h to 2.7h50B+
Data Processing
Daily events, up from 2.5B94.2%
Prediction Accuracy
Equipment failure prediction<5 min
Data Freshness
From 48+ hours latencyReturn on Investment
$47M annual value through $18M churn reduction, $15M capex optimization, $9M operational efficiency, and $5M revenue assurance. Platform investment recovered in 11 months.