Real-Time vs Batch Processing: A Decision Framework
A practical guide to choosing between real-time streaming and batch processing architectures—covering use cases, cost analysis, and hybrid approaches
The 'real-time vs batch' question isn't binary—it's about matching processing semantics to business requirements. This guide provides a structured framework for evaluating latency needs, cost implications, and complexity trade-offs, helping you choose the right approach for each use case.
- Basic understanding of data pipelines and ETL concepts
- Familiarity with distributed systems basics

Real-Time vs Batch Processing
The False Dichotomy
"We need everything in real-time" is a common request—and usually the wrong starting point. Real-time processing adds significant complexity and cost. The better question is: "What actually needs to be real-time, and what can wait?"
This guide provides a framework for making that decision systematically, considering business requirements, technical constraints, and economic factors.
Understanding the Spectrum
Data processing latency exists on a spectrum, not a binary:
┌──────────────────────────────────────────────────────────────────────────────┐
│ LATENCY SPECTRUM │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ◄────────────────────────────────────────────────────────────────────────► │
│ Milliseconds Seconds Minutes Hours Days │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Real- │ │ Near │ │ Micro- │ │ Mini- │ │ Classic │ │
│ │ Time │ │ Real- │ │ Batch │ │ Batch │ │ Batch │ │
│ │ │ │ Time │ │ │ │ │ │ │ │
│ │ <100ms │ │ 1-30s │ │ 1-15min │ │ 1-4hr │ │ Daily+ │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Examples: │
│ • Fraud • Personali- • Dashboard • Reports • Data │
│ detection zation refresh warehouse │
│ • Pricing • Inventory • Alerting • ML model • Historical │
│ • Gaming updates training analysis │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Architectural Patterns
Pattern 1: Lambda Architecture
The Lambda architecture maintains parallel batch and speed layers, merging results at query time.
┌──────────────────────────────────────────────────────────────────────────────┐
│ LAMBDA ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Source │ │
│ │ Data │ │
│ └──────┬──────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ BATCH LAYER │ │ SPEED LAYER │ │
│ │ │ │ │ │
│ │ • Complete │ │ • Incremental │ │
│ │ recompute │ │ updates │ │
│ │ • High │ │ • Low latency │ │
│ │ accuracy │ │ • Approximate │ │
│ │ • Immutable │ │ • Mutable │ │
│ │ │ │ │ │
│ │ Spark, Hive │ │ Kafka, Flink │ │
│ └───────┬────────┘ └───────┬────────┘ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ └───►│ SERVING │◄─────┘ │
│ │ LAYER │ │
│ │ │ │
│ │ Merges │ │
│ │ results │ │
│ │ at query │ │
│ └─────────────┘ │
│ │
│ Pros: Cons: │
│ • Accurate batch results • Operational complexity (2 systems) │
│ • Low-latency speed layer • Logic duplication │
│ • Fault tolerance • Merge complexity │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Pattern 2: Kappa Architecture
The Kappa architecture uses a single streaming layer for both real-time and historical processing.
┌──────────────────────────────────────────────────────────────────────────────┐
│ KAPPA ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Source │ │
│ │ Data │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ STREAMING LOG │ │
│ │ (Kafka) │ │
│ │ │ │
│ │ • Immutable log │ │
│ │ • Long retention │ │
│ │ • Replayable │ │
│ └───────────┬────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ STREAM PROCESSOR │ │
│ │ (Flink/Spark) │ │
│ │ │ │
│ │ Single codebase │ │
│ │ handles both: │ │
│ │ • Real-time updates │ │
│ │ • Historical replay │ │
│ └───────────┬────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ SERVING LAYER │ │
│ └────────────────────────┘ │
│ │
│ Pros: Cons: │
│ • Single codebase • Requires long log retention │
│ • Simpler operations • Reprocessing can be slow │
│ • Consistent logic • Higher Kafka costs │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Pattern 3: Hybrid / Medallion
The modern approach: stream into a lakehouse, materialize views at different latencies.
┌──────────────────────────────────────────────────────────────────────────────┐
│ HYBRID LAKEHOUSE PATTERN │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Sources │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ STREAMING INGESTION │ │
│ │ Kafka / Kinesis │ │
│ └───────────┬────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DELTA LAKE │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Bronze │─►│ Silver │─►│ Gold │ │ │
│ │ │ (raw) │ │ (cleaned) │ │ (aggregated) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Streaming │ │ Streaming │ │ Batch or │ │ │
│ │ │ writes │ │ transforms │ │ Streaming │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Real-Time │ │ Hourly │ │ Daily │ │
│ │ Dashboard │ │ Reports │ │ Warehouse │ │
│ │ (seconds) │ │ (minutes) │ │ (hours) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Each consumption layer chooses appropriate freshness/cost tradeoff │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Decision Framework
Step 1: Identify Latency Requirements
Start by mapping use cases to actual latency needs:
# decision_framework.py
"""
Framework for evaluating real-time vs batch processing decisions.
"""
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
class LatencyTier(Enum):
"""Latency tiers with processing implications."""
REAL_TIME = "real_time" # < 100ms, requires streaming
NEAR_REAL_TIME = "near_real_time" # 1-30 seconds
MICRO_BATCH = "micro_batch" # 1-15 minutes
MINI_BATCH = "mini_batch" # 1-4 hours
BATCH = "batch" # Daily or less frequent
@dataclass
class UseCase:
"""Represents a data processing use case."""
name: str
description: str
data_source: str
# Latency requirements
latency_requirement: LatencyTier
latency_justification: str
# Volume characteristics
events_per_second: float
peak_multiplier: float # Peak vs average ratio
# Processing characteristics
requires_historical_context: bool # Needs to join with historical data
requires_ordering: bool # Events must be processed in order
requires_exactly_once: bool # Duplicates are unacceptable
# Business characteristics
revenue_impact_per_minute_delay: float
cost_of_error: str # 'low', 'medium', 'high', 'critical'
class LatencyRequirementAnalyzer:
"""Analyzes whether stated latency requirements are justified."""
# Questions to challenge real-time requirements
CHALLENGE_QUESTIONS = [
"What business decision changes based on this data?",
"What happens if data is 5 minutes late? 1 hour late?",
"Who consumes this data and how often do they check it?",
"Is there a human in the loop, or is it fully automated?",
"What's the cost of a wrong decision vs a delayed decision?",
]
@staticmethod
def analyze_requirement(use_case: UseCase) -> dict:
"""
Analyze whether the stated latency requirement is justified.
Returns recommendation and rationale.
"""
requirement = use_case.latency_requirement
recommendation = requirement # Start with stated requirement
# Check for over-specification
if requirement in [LatencyTier.REAL_TIME, LatencyTier.NEAR_REAL_TIME]:
# If there's a human consumer, real-time is rarely needed
if "dashboard" in use_case.description.lower():
if use_case.revenue_impact_per_minute_delay < 100:
recommendation = LatencyTier.MICRO_BATCH
# If it requires historical context, streaming is complex
if use_case.requires_historical_context:
# Note: still possible, but adds complexity
pass
# Check for under-specification
if requirement in [LatencyTier.BATCH, LatencyTier.MINI_BATCH]:
# High revenue impact might justify faster processing
if use_case.revenue_impact_per_minute_delay > 1000:
recommendation = LatencyTier.MICRO_BATCH
# Critical error costs might justify real-time
if use_case.cost_of_error == 'critical':
recommendation = LatencyTier.NEAR_REAL_TIME
return {
'stated_requirement': requirement,
'recommendation': recommendation,
'change_recommended': requirement != recommendation,
'rationale': _build_rationale(use_case, requirement, recommendation)
}
def _build_rationale(use_case: UseCase, stated: LatencyTier, recommended: LatencyTier) -> str:
"""Build explanation for the recommendation."""
if stated == recommended:
return f"The {stated.value} requirement is appropriate for this use case."
if recommended.value > stated.value: # Moving toward batch
return (
f"Consider relaxing from {stated.value} to {recommended.value}. "
f"Revenue impact of ${use_case.revenue_impact_per_minute_delay}/min delay "
f"may not justify the complexity of {stated.value} processing."
)
else: # Moving toward real-time
return (
f"Consider upgrading from {stated.value} to {recommended.value}. "
f"The {use_case.cost_of_error} cost of errors and "
f"${use_case.revenue_impact_per_minute_delay}/min delay impact "
f"may justify the investment."
)
Step 2: Cost Analysis
Real-time processing typically costs 3-10x more than batch. Analyze total cost of ownership:
# cost_analysis.py
"""
Cost analysis framework for processing architecture decisions.
"""
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
@dataclass
class CostModel:
"""Cost model for a processing approach."""
# Infrastructure costs (monthly)
compute_cost: float
storage_cost: float
network_cost: float
# Operational costs (monthly)
engineering_hours: float
on_call_hours: float
incident_response_hours: float
# Development costs (one-time)
initial_development_days: float
testing_days: float
# Derived costs
hourly_rate: float = 150.0 # Fully-loaded engineer cost
@property
def monthly_infrastructure(self) -> float:
return self.compute_cost + self.storage_cost + self.network_cost
@property
def monthly_operational(self) -> float:
total_hours = self.engineering_hours + self.on_call_hours + self.incident_response_hours
return total_hours * self.hourly_rate
@property
def total_monthly(self) -> float:
return self.monthly_infrastructure + self.monthly_operational
@property
def initial_investment(self) -> float:
return (self.initial_development_days + self.testing_days) * 8 * self.hourly_rate
class ProcessingCostEstimator:
"""Estimates costs for different processing approaches."""
# Baseline multipliers (relative to simple batch)
COMPLEXITY_MULTIPLIERS = {
'batch_simple': {
'compute': 1.0,
'storage': 1.0,
'engineering': 1.0,
'on_call': 0.5,
'development': 1.0,
},
'batch_complex': {
'compute': 1.5,
'storage': 1.2,
'engineering': 1.5,
'on_call': 1.0,
'development': 1.5,
},
'micro_batch': {
'compute': 2.0,
'storage': 1.5,
'engineering': 2.0,
'on_call': 1.5,
'development': 2.0,
},
'near_real_time': {
'compute': 4.0,
'storage': 2.0,
'engineering': 3.0,
'on_call': 2.5,
'development': 3.0,
},
'real_time': {
'compute': 8.0,
'storage': 3.0,
'engineering': 5.0,
'on_call': 4.0,
'development': 5.0,
},
}
def __init__(self, baseline_monthly_compute: float = 1000):
self.baseline = baseline_monthly_compute
def estimate_costs(self, approach: str, events_per_day: int) -> CostModel:
"""
Estimate costs for a given approach and volume.
"""
multipliers = self.COMPLEXITY_MULTIPLIERS.get(approach, self.COMPLEXITY_MULTIPLIERS['batch_simple'])
# Volume scaling (log scale)
import math
volume_factor = max(1, math.log10(events_per_day / 1_000_000 + 1))
return CostModel(
compute_cost=self.baseline * multipliers['compute'] * volume_factor,
storage_cost=self.baseline * 0.3 * multipliers['storage'] * volume_factor,
network_cost=self.baseline * 0.1 * volume_factor,
engineering_hours=20 * multipliers['engineering'],
on_call_hours=10 * multipliers['on_call'],
incident_response_hours=5 * multipliers['on_call'],
initial_development_days=20 * multipliers['development'],
testing_days=10 * multipliers['development'],
)
def compare_approaches(
self,
events_per_day: int,
approaches: List[str] = None
) -> Dict[str, Dict]:
"""
Compare costs across multiple approaches.
"""
if approaches is None:
approaches = ['batch_simple', 'micro_batch', 'near_real_time', 'real_time']
results = {}
for approach in approaches:
cost_model = self.estimate_costs(approach, events_per_day)
results[approach] = {
'monthly_infrastructure': cost_model.monthly_infrastructure,
'monthly_operational': cost_model.monthly_operational,
'total_monthly': cost_model.total_monthly,
'initial_investment': cost_model.initial_investment,
'annual_tco': cost_model.total_monthly * 12 + cost_model.initial_investment,
}
return results
def calculate_break_even(
batch_cost: CostModel,
streaming_cost: CostModel,
revenue_per_minute_latency: float
) -> dict:
"""
Calculate break-even point for streaming investment.
Returns minutes of latency reduction needed to justify streaming cost.
"""
monthly_cost_difference = streaming_cost.total_monthly - batch_cost.total_monthly
initial_cost_difference = streaming_cost.initial_investment - batch_cost.initial_investment
# Assume batch has 60-minute latency, streaming has 1-minute
latency_reduction_minutes = 59
# Monthly value of reduced latency
# (Assuming continuous value, not just during business hours)
monthly_value = latency_reduction_minutes * revenue_per_minute_latency * 30 * 24
roi_monthly = monthly_value - monthly_cost_difference
payback_months = initial_cost_difference / roi_monthly if roi_monthly > 0 else float('inf')
return {
'monthly_cost_increase': monthly_cost_difference,
'monthly_latency_value': monthly_value,
'monthly_net_benefit': roi_monthly,
'initial_investment_increase': initial_cost_difference,
'payback_months': payback_months,
'recommended': payback_months < 12 # ROI within 1 year
}
Step 3: Decision Matrix
Use this matrix to guide initial decisions:
| Factor | Favors Real-Time | Favors Batch |
|---|---|---|
| Latency Requirement | < 5 minutes | > 1 hour acceptable |
| Data Volume | Low-medium (< 100K events/sec) | Very high volume |
| Processing Complexity | Simple transformations | Complex joins, ML |
| Historical Context | Minimal lookups | Heavy historical joins |
| Ordering Requirements | Strict ordering needed | Order not critical |
| Exactly-Once Semantics | Required | Best-effort acceptable |
| Error Recovery | Immediate retry | Rerun full batch |
| Team Experience | Streaming expertise | Batch expertise |
| Infrastructure | Kubernetes, managed Kafka | Existing Spark/warehouse |
| Budget | Flexible | Constrained |
Implementation Patterns
Pattern: Streaming Ingestion, Batch Processing
The most common hybrid pattern: ingest via streaming for durability, process in batch for simplicity.
# hybrid_pattern.py
"""
Streaming ingestion with batch processing pattern.
Best of both worlds for many use cases.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from datetime import datetime, timedelta
class HybridPipeline:
"""
Implements streaming ingestion with batch processing.
Data flow:
1. Kafka -> Bronze (streaming, append-only)
2. Bronze -> Silver (batch, hourly)
3. Silver -> Gold (batch, daily)
"""
def __init__(self, spark: SparkSession, checkpoint_dir: str):
self.spark = spark
self.checkpoint_dir = checkpoint_dir
def start_streaming_ingestion(self, kafka_servers: str, topic: str):
"""
Streaming ingestion into Bronze layer.
Runs continuously, appends raw events.
"""
raw_stream = (
self.spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()
)
# Parse and add metadata
parsed_stream = (
raw_stream
.select(
col("key").cast("string").alias("event_key"),
col("value").cast("string").alias("event_payload"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_timestamp"),
current_timestamp().alias("ingested_at")
)
)
# Write to Bronze layer (Delta Lake)
query = (
parsed_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{self.checkpoint_dir}/bronze")
.option("mergeSchema", "true")
.trigger(processingTime="10 seconds") # Micro-batch every 10 seconds
.start("s3://datalake/bronze/events")
)
return query
def run_silver_batch(self, processing_hour: datetime):
"""
Batch processing from Bronze to Silver.
Runs hourly, processes previous hour's data.
"""
hour_start = processing_hour.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
# Read new Bronze data
bronze_df = (
self.spark.read.format("delta")
.load("s3://datalake/bronze/events")
.filter(
(col("ingested_at") >= hour_start) &
(col("ingested_at") < hour_end)
)
)
# Parse JSON payload
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
event_schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType()),
])
silver_df = (
bronze_df
.withColumn("parsed", from_json(col("event_payload"), event_schema))
.select(
col("parsed.event_id").alias("event_id"),
col("parsed.user_id").alias("user_id"),
col("parsed.event_type").alias("event_type"),
col("parsed.amount").alias("amount"),
col("parsed.timestamp").alias("event_timestamp"),
col("ingested_at"),
date_format(col("parsed.timestamp"), "yyyy-MM-dd").alias("event_date"),
hour(col("parsed.timestamp")).alias("event_hour"),
)
.filter(col("event_id").isNotNull()) # Data quality filter
.dropDuplicates(["event_id"]) # Deduplication
)
# Merge into Silver layer (upsert for idempotency)
if DeltaTable.isDeltaTable(self.spark, "s3://datalake/silver/events"):
silver_table = DeltaTable.forPath(self.spark, "s3://datalake/silver/events")
(
silver_table.alias("target")
.merge(
silver_df.alias("source"),
"target.event_id = source.event_id"
)
.whenNotMatchedInsertAll()
.execute()
)
else:
# First run - create table
(
silver_df.write
.format("delta")
.partitionBy("event_date")
.mode("overwrite")
.save("s3://datalake/silver/events")
)
return silver_df.count()
def run_gold_aggregation(self, processing_date: datetime):
"""
Daily aggregation from Silver to Gold.
Creates business-level metrics.
"""
target_date = processing_date.date()
# Read Silver data for the day
silver_df = (
self.spark.read.format("delta")
.load("s3://datalake/silver/events")
.filter(col("event_date") == str(target_date))
)
# Aggregate to Gold metrics
daily_metrics = (
silver_df
.groupBy("event_date", "event_type")
.agg(
countDistinct("user_id").alias("unique_users"),
count("event_id").alias("event_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
min("event_timestamp").alias("first_event"),
max("event_timestamp").alias("last_event"),
)
)
# User-level daily summary
user_daily = (
silver_df
.groupBy("event_date", "user_id")
.agg(
count("event_id").alias("events"),
sum("amount").alias("daily_spend"),
collect_set("event_type").alias("event_types"),
)
)
# Write to Gold layer
(
daily_metrics.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", f"event_date = '{target_date}'")
.save("s3://datalake/gold/daily_metrics")
)
(
user_daily.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", f"event_date = '{target_date}'")
.save("s3://datalake/gold/user_daily")
)
return daily_metrics.count()
Pattern: Streaming with Materialized Views
For near-real-time dashboards without full streaming complexity.
# materialized_views.py
"""
Materialized views pattern for near-real-time analytics.
Simpler than full streaming, faster than batch.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import schedule
import time
class MaterializedViewManager:
"""
Manages materialized views with configurable refresh intervals.
Each view can have different freshness requirements.
"""
def __init__(self, spark: SparkSession):
self.spark = spark
self.views = {}
def register_view(
self,
name: str,
source_table: str,
aggregation_query: str,
refresh_interval_minutes: int,
partition_column: str = None
):
"""Register a materialized view with its refresh configuration."""
self.views[name] = {
'source_table': source_table,
'query': aggregation_query,
'refresh_interval': refresh_interval_minutes,
'partition_column': partition_column,
'last_refreshed': None,
}
def refresh_view(self, name: str):
"""Refresh a specific materialized view."""
view_config = self.views[name]
# Execute aggregation query
result_df = self.spark.sql(view_config['query'])
# Write to materialized view location
target_path = f"s3://datalake/materialized_views/{name}"
if view_config['partition_column']:
(
result_df.write
.format("delta")
.mode("overwrite")
.partitionBy(view_config['partition_column'])
.save(target_path)
)
else:
(
result_df.write
.format("delta")
.mode("overwrite")
.save(target_path)
)
view_config['last_refreshed'] = datetime.now()
return result_df.count()
def start_refresh_scheduler(self):
"""Start background scheduler for view refreshes."""
for name, config in self.views.items():
interval = config['refresh_interval']
# Schedule refresh
schedule.every(interval).minutes.do(
self.refresh_view, name=name
)
# Run scheduler loop
while True:
schedule.run_pending()
time.sleep(10)
# Example usage
def setup_dashboard_views(manager: MaterializedViewManager):
"""Set up materialized views for a dashboard."""
# Real-time KPIs - refresh every 1 minute
manager.register_view(
name="realtime_kpis",
source_table="silver.events",
aggregation_query="""
SELECT
current_timestamp() as snapshot_time,
count(*) as events_last_hour,
count(distinct user_id) as active_users,
sum(amount) as revenue
FROM silver.events
WHERE event_timestamp > current_timestamp() - INTERVAL 1 HOUR
""",
refresh_interval_minutes=1,
)
# Hourly trends - refresh every 5 minutes
manager.register_view(
name="hourly_trends",
source_table="silver.events",
aggregation_query="""
SELECT
date_trunc('hour', event_timestamp) as hour,
event_type,
count(*) as events,
sum(amount) as revenue,
count(distinct user_id) as users
FROM silver.events
WHERE event_timestamp > current_timestamp() - INTERVAL 24 HOURS
GROUP BY 1, 2
""",
refresh_interval_minutes=5,
partition_column="hour",
)
# Daily summary - refresh every 15 minutes
manager.register_view(
name="daily_summary",
source_table="silver.events",
aggregation_query="""
SELECT
date(event_timestamp) as date,
count(*) as total_events,
sum(amount) as total_revenue,
count(distinct user_id) as total_users,
sum(amount) / count(distinct user_id) as revenue_per_user
FROM silver.events
WHERE event_timestamp > current_timestamp() - INTERVAL 30 DAYS
GROUP BY 1
""",
refresh_interval_minutes=15,
partition_column="date",
)
Migration Strategies
From Batch to Streaming
┌──────────────────────────────────────────────────────────────────────────────┐
│ BATCH TO STREAMING MIGRATION │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: Parallel Run │
│ ───────────────────── │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Existing │──────────────────────│ Target │ │
│ │ Batch │ │ Table │ │
│ │ Pipeline │ │ │ │
│ └──────────────┘ └──────────────┘ │
│ │ ▲ │
│ │ ┌──────────────┐ │ │
│ └─────────────►│ Compare │───────┘ │
│ │ Results │ │
│ ┌──────────────┐ └──────────────┘ │
│ │ New │──────────┘ │
│ │ Streaming │ │
│ │ Pipeline │ (Shadow mode - results compared but not used) │
│ └──────────────┘ │
│ │
│ Phase 2: Canary │
│ ─────────────── │
│ • Route 10% of traffic to streaming results │
│ • Monitor for discrepancies │
│ • Gradually increase percentage │
│ │
│ Phase 3: Cutover │
│ ──────────────── │
│ • Streaming becomes primary │
│ • Batch runs as backup/validation │
│ • Eventually decommission batch │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Key Takeaways
-
Real-time isn't always better: It's a trade-off. Higher complexity and cost must be justified by business value.
-
Start with the business question: What decision changes based on fresher data? What's the cost of delay?
-
Hybrid approaches win: Stream into a lakehouse, materialize views at different latencies based on consumer needs.
-
Streaming ingestion, batch processing: Often the best of both worlds. Durable streaming capture, simple batch logic.
-
Cost the full picture: Include engineering time, on-call burden, and operational complexity—not just compute.
-
Plan for evolution: Start with batch, add streaming for specific use cases. Don't over-engineer from day one.
References
- Marz, N., & Warren, J. (2015). Big Data: Principles and Best Practices of Scalable Real-time Data Systems. Manning.
- Kreps, J. (2014). "Questioning the Lambda Architecture." O'Reilly Radar.
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Narkhede, N., Shapira, G., & Palino, T. (2017). Kafka: The Definitive Guide. O'Reilly Media.
- Databricks Documentation: Delta Lake Streaming
- Apache Flink Documentation: DataStream API
Key Takeaways
- ✓Real-time processing isn't always better—it's a trade-off between latency, cost, and complexity
- ✓Most organizations benefit from a hybrid approach: real-time for critical paths, batch for the rest
- ✓The 'speed layer' and 'batch layer' concept from Lambda architecture remains relevant even in modern stacks
- ✓Streaming-first architectures (Kappa) simplify operations but require careful capacity planning
- ✓Cost analysis should include operational complexity, not just compute resources



