Skip to main content
|16 min read|Beginner

Real-Time vs Batch Processing: A Decision Framework

A practical guide to choosing between real-time streaming and batch processing architectures—covering use cases, cost analysis, and hybrid approaches

Stream ProcessingBatch ProcessingArchitectureKafkaSparkDecision Framework
TL;DR

The 'real-time vs batch' question isn't binary—it's about matching processing semantics to business requirements. This guide provides a structured framework for evaluating latency needs, cost implications, and complexity trade-offs, helping you choose the right approach for each use case.

Prerequisites
  • Basic understanding of data pipelines and ETL concepts
  • Familiarity with distributed systems basics
Real-Time vs Batch Processing: A Decision Framework

Real-Time vs Batch Processing

The False Dichotomy

"We need everything in real-time" is a common request—and usually the wrong starting point. Real-time processing adds significant complexity and cost. The better question is: "What actually needs to be real-time, and what can wait?"

This guide provides a framework for making that decision systematically, considering business requirements, technical constraints, and economic factors.

Understanding the Spectrum

Data processing latency exists on a spectrum, not a binary:

┌──────────────────────────────────────────────────────────────────────────────┐
│                        LATENCY SPECTRUM                                       │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ◄────────────────────────────────────────────────────────────────────────►  │
│  Milliseconds    Seconds    Minutes    Hours    Days                         │
│                                                                              │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐            │
│  │ Real-   │  │ Near    │  │ Micro-  │  │ Mini-   │  │ Classic │            │
│  │ Time    │  │ Real-   │  │ Batch   │  │ Batch   │  │ Batch   │            │
│  │         │  │ Time    │  │         │  │         │  │         │            │
│  │ <100ms  │  │ 1-30s   │  │ 1-15min │  │ 1-4hr   │  │ Daily+  │            │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘            │
│                                                                              │
│  Examples:                                                                   │
│  • Fraud      • Personali-  • Dashboard  • Reports   • Data       │
│    detection    zation        refresh                  warehouse   │
│  • Pricing    • Inventory   • Alerting   • ML model  • Historical │
│  • Gaming       updates                    training    analysis    │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Architectural Patterns

Pattern 1: Lambda Architecture

The Lambda architecture maintains parallel batch and speed layers, merging results at query time.

┌──────────────────────────────────────────────────────────────────────────────┐
│                         LAMBDA ARCHITECTURE                                   │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                          ┌─────────────┐                                     │
│                          │   Source    │                                     │
│                          │    Data     │                                     │
│                          └──────┬──────┘                                     │
│                                 │                                            │
│                    ┌────────────┴────────────┐                               │
│                    ▼                         ▼                               │
│           ┌────────────────┐        ┌────────────────┐                       │
│           │  BATCH LAYER   │        │  SPEED LAYER   │                       │
│           │                │        │                │                       │
│           │  • Complete    │        │  • Incremental │                       │
│           │    recompute   │        │    updates     │                       │
│           │  • High        │        │  • Low latency │                       │
│           │    accuracy    │        │  • Approximate │                       │
│           │  • Immutable   │        │  • Mutable     │                       │
│           │                │        │                │                       │
│           │  Spark, Hive   │        │  Kafka, Flink  │                       │
│           └───────┬────────┘        └───────┬────────┘                       │
│                   │                         │                                │
│                   │    ┌─────────────┐      │                                │
│                   └───►│  SERVING    │◄─────┘                                │
│                        │   LAYER     │                                       │
│                        │             │                                       │
│                        │  Merges     │                                       │
│                        │  results    │                                       │
│                        │  at query   │                                       │
│                        └─────────────┘                                       │
│                                                                              │
│  Pros:                           Cons:                                       │
│  • Accurate batch results        • Operational complexity (2 systems)        │
│  • Low-latency speed layer       • Logic duplication                         │
│  • Fault tolerance               • Merge complexity                          │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Pattern 2: Kappa Architecture

The Kappa architecture uses a single streaming layer for both real-time and historical processing.

┌──────────────────────────────────────────────────────────────────────────────┐
│                          KAPPA ARCHITECTURE                                   │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                          ┌─────────────┐                                     │
│                          │   Source    │                                     │
│                          │    Data     │                                     │
│                          └──────┬──────┘                                     │
│                                 │                                            │
│                                 ▼                                            │
│                    ┌────────────────────────┐                                │
│                    │     STREAMING LOG      │                                │
│                    │       (Kafka)          │                                │
│                    │                        │                                │
│                    │  • Immutable log       │                                │
│                    │  • Long retention      │                                │
│                    │  • Replayable          │                                │
│                    └───────────┬────────────┘                                │
│                                │                                             │
│                                ▼                                             │
│                    ┌────────────────────────┐                                │
│                    │   STREAM PROCESSOR     │                                │
│                    │    (Flink/Spark)       │                                │
│                    │                        │                                │
│                    │  Single codebase       │                                │
│                    │  handles both:         │                                │
│                    │  • Real-time updates   │                                │
│                    │  • Historical replay   │                                │
│                    └───────────┬────────────┘                                │
│                                │                                             │
│                                ▼                                             │
│                    ┌────────────────────────┐                                │
│                    │    SERVING LAYER       │                                │
│                    └────────────────────────┘                                │
│                                                                              │
│  Pros:                           Cons:                                       │
│  • Single codebase               • Requires long log retention               │
│  • Simpler operations            • Reprocessing can be slow                  │
│  • Consistent logic              • Higher Kafka costs                        │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Pattern 3: Hybrid / Medallion

The modern approach: stream into a lakehouse, materialize views at different latencies.

┌──────────────────────────────────────────────────────────────────────────────┐
│                      HYBRID LAKEHOUSE PATTERN                                 │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                          ┌─────────────┐                                     │
│                          │   Sources   │                                     │
│                          └──────┬──────┘                                     │
│                                 │                                            │
│                                 ▼                                            │
│                    ┌────────────────────────┐                                │
│                    │   STREAMING INGESTION  │                                │
│                    │   Kafka / Kinesis      │                                │
│                    └───────────┬────────────┘                                │
│                                │                                             │
│                                ▼                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                        DELTA LAKE                                    │    │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │    │
│  │  │   Bronze     │─►│   Silver     │─►│    Gold      │               │    │
│  │  │  (raw)       │  │ (cleaned)    │  │ (aggregated) │               │    │
│  │  │              │  │              │  │              │               │    │
│  │  │ Streaming    │  │ Streaming    │  │ Batch or     │               │    │
│  │  │ writes       │  │ transforms   │  │ Streaming    │               │    │
│  │  └──────────────┘  └──────────────┘  └──────────────┘               │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                │                                             │
│              ┌─────────────────┼─────────────────┐                          │
│              ▼                 ▼                 ▼                          │
│     ┌─────────────┐   ┌─────────────┐   ┌─────────────┐                     │
│     │  Real-Time  │   │   Hourly    │   │   Daily     │                     │
│     │  Dashboard  │   │   Reports   │   │   Warehouse │                     │
│     │  (seconds)  │   │  (minutes)  │   │   (hours)   │                     │
│     └─────────────┘   └─────────────┘   └─────────────┘                     │
│                                                                              │
│  Each consumption layer chooses appropriate freshness/cost tradeoff          │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Decision Framework

Step 1: Identify Latency Requirements

Start by mapping use cases to actual latency needs:

# decision_framework.py
"""
Framework for evaluating real-time vs batch processing decisions.
"""

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class LatencyTier(Enum):
    """Latency tiers with processing implications."""
    REAL_TIME = "real_time"          # < 100ms, requires streaming
    NEAR_REAL_TIME = "near_real_time" # 1-30 seconds
    MICRO_BATCH = "micro_batch"       # 1-15 minutes
    MINI_BATCH = "mini_batch"         # 1-4 hours
    BATCH = "batch"                   # Daily or less frequent


@dataclass
class UseCase:
    """Represents a data processing use case."""
    name: str
    description: str
    data_source: str

    # Latency requirements
    latency_requirement: LatencyTier
    latency_justification: str

    # Volume characteristics
    events_per_second: float
    peak_multiplier: float  # Peak vs average ratio

    # Processing characteristics
    requires_historical_context: bool  # Needs to join with historical data
    requires_ordering: bool            # Events must be processed in order
    requires_exactly_once: bool        # Duplicates are unacceptable

    # Business characteristics
    revenue_impact_per_minute_delay: float
    cost_of_error: str  # 'low', 'medium', 'high', 'critical'


class LatencyRequirementAnalyzer:
    """Analyzes whether stated latency requirements are justified."""

    # Questions to challenge real-time requirements
    CHALLENGE_QUESTIONS = [
        "What business decision changes based on this data?",
        "What happens if data is 5 minutes late? 1 hour late?",
        "Who consumes this data and how often do they check it?",
        "Is there a human in the loop, or is it fully automated?",
        "What's the cost of a wrong decision vs a delayed decision?",
    ]

    @staticmethod
    def analyze_requirement(use_case: UseCase) -> dict:
        """
        Analyze whether the stated latency requirement is justified.
        Returns recommendation and rationale.
        """
        requirement = use_case.latency_requirement
        recommendation = requirement  # Start with stated requirement

        # Check for over-specification
        if requirement in [LatencyTier.REAL_TIME, LatencyTier.NEAR_REAL_TIME]:

            # If there's a human consumer, real-time is rarely needed
            if "dashboard" in use_case.description.lower():
                if use_case.revenue_impact_per_minute_delay < 100:
                    recommendation = LatencyTier.MICRO_BATCH

            # If it requires historical context, streaming is complex
            if use_case.requires_historical_context:
                # Note: still possible, but adds complexity
                pass

        # Check for under-specification
        if requirement in [LatencyTier.BATCH, LatencyTier.MINI_BATCH]:

            # High revenue impact might justify faster processing
            if use_case.revenue_impact_per_minute_delay > 1000:
                recommendation = LatencyTier.MICRO_BATCH

            # Critical error costs might justify real-time
            if use_case.cost_of_error == 'critical':
                recommendation = LatencyTier.NEAR_REAL_TIME

        return {
            'stated_requirement': requirement,
            'recommendation': recommendation,
            'change_recommended': requirement != recommendation,
            'rationale': _build_rationale(use_case, requirement, recommendation)
        }


def _build_rationale(use_case: UseCase, stated: LatencyTier, recommended: LatencyTier) -> str:
    """Build explanation for the recommendation."""
    if stated == recommended:
        return f"The {stated.value} requirement is appropriate for this use case."

    if recommended.value > stated.value:  # Moving toward batch
        return (
            f"Consider relaxing from {stated.value} to {recommended.value}. "
            f"Revenue impact of ${use_case.revenue_impact_per_minute_delay}/min delay "
            f"may not justify the complexity of {stated.value} processing."
        )
    else:  # Moving toward real-time
        return (
            f"Consider upgrading from {stated.value} to {recommended.value}. "
            f"The {use_case.cost_of_error} cost of errors and "
            f"${use_case.revenue_impact_per_minute_delay}/min delay impact "
            f"may justify the investment."
        )

Step 2: Cost Analysis

Real-time processing typically costs 3-10x more than batch. Analyze total cost of ownership:

# cost_analysis.py
"""
Cost analysis framework for processing architecture decisions.
"""

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum

@dataclass
class CostModel:
    """Cost model for a processing approach."""

    # Infrastructure costs (monthly)
    compute_cost: float
    storage_cost: float
    network_cost: float

    # Operational costs (monthly)
    engineering_hours: float
    on_call_hours: float
    incident_response_hours: float

    # Development costs (one-time)
    initial_development_days: float
    testing_days: float

    # Derived costs
    hourly_rate: float = 150.0  # Fully-loaded engineer cost

    @property
    def monthly_infrastructure(self) -> float:
        return self.compute_cost + self.storage_cost + self.network_cost

    @property
    def monthly_operational(self) -> float:
        total_hours = self.engineering_hours + self.on_call_hours + self.incident_response_hours
        return total_hours * self.hourly_rate

    @property
    def total_monthly(self) -> float:
        return self.monthly_infrastructure + self.monthly_operational

    @property
    def initial_investment(self) -> float:
        return (self.initial_development_days + self.testing_days) * 8 * self.hourly_rate


class ProcessingCostEstimator:
    """Estimates costs for different processing approaches."""

    # Baseline multipliers (relative to simple batch)
    COMPLEXITY_MULTIPLIERS = {
        'batch_simple': {
            'compute': 1.0,
            'storage': 1.0,
            'engineering': 1.0,
            'on_call': 0.5,
            'development': 1.0,
        },
        'batch_complex': {
            'compute': 1.5,
            'storage': 1.2,
            'engineering': 1.5,
            'on_call': 1.0,
            'development': 1.5,
        },
        'micro_batch': {
            'compute': 2.0,
            'storage': 1.5,
            'engineering': 2.0,
            'on_call': 1.5,
            'development': 2.0,
        },
        'near_real_time': {
            'compute': 4.0,
            'storage': 2.0,
            'engineering': 3.0,
            'on_call': 2.5,
            'development': 3.0,
        },
        'real_time': {
            'compute': 8.0,
            'storage': 3.0,
            'engineering': 5.0,
            'on_call': 4.0,
            'development': 5.0,
        },
    }

    def __init__(self, baseline_monthly_compute: float = 1000):
        self.baseline = baseline_monthly_compute

    def estimate_costs(self, approach: str, events_per_day: int) -> CostModel:
        """
        Estimate costs for a given approach and volume.
        """
        multipliers = self.COMPLEXITY_MULTIPLIERS.get(approach, self.COMPLEXITY_MULTIPLIERS['batch_simple'])

        # Volume scaling (log scale)
        import math
        volume_factor = max(1, math.log10(events_per_day / 1_000_000 + 1))

        return CostModel(
            compute_cost=self.baseline * multipliers['compute'] * volume_factor,
            storage_cost=self.baseline * 0.3 * multipliers['storage'] * volume_factor,
            network_cost=self.baseline * 0.1 * volume_factor,
            engineering_hours=20 * multipliers['engineering'],
            on_call_hours=10 * multipliers['on_call'],
            incident_response_hours=5 * multipliers['on_call'],
            initial_development_days=20 * multipliers['development'],
            testing_days=10 * multipliers['development'],
        )

    def compare_approaches(
        self,
        events_per_day: int,
        approaches: List[str] = None
    ) -> Dict[str, Dict]:
        """
        Compare costs across multiple approaches.
        """
        if approaches is None:
            approaches = ['batch_simple', 'micro_batch', 'near_real_time', 'real_time']

        results = {}
        for approach in approaches:
            cost_model = self.estimate_costs(approach, events_per_day)
            results[approach] = {
                'monthly_infrastructure': cost_model.monthly_infrastructure,
                'monthly_operational': cost_model.monthly_operational,
                'total_monthly': cost_model.total_monthly,
                'initial_investment': cost_model.initial_investment,
                'annual_tco': cost_model.total_monthly * 12 + cost_model.initial_investment,
            }

        return results


def calculate_break_even(
    batch_cost: CostModel,
    streaming_cost: CostModel,
    revenue_per_minute_latency: float
) -> dict:
    """
    Calculate break-even point for streaming investment.

    Returns minutes of latency reduction needed to justify streaming cost.
    """
    monthly_cost_difference = streaming_cost.total_monthly - batch_cost.total_monthly
    initial_cost_difference = streaming_cost.initial_investment - batch_cost.initial_investment

    # Assume batch has 60-minute latency, streaming has 1-minute
    latency_reduction_minutes = 59

    # Monthly value of reduced latency
    # (Assuming continuous value, not just during business hours)
    monthly_value = latency_reduction_minutes * revenue_per_minute_latency * 30 * 24

    roi_monthly = monthly_value - monthly_cost_difference
    payback_months = initial_cost_difference / roi_monthly if roi_monthly > 0 else float('inf')

    return {
        'monthly_cost_increase': monthly_cost_difference,
        'monthly_latency_value': monthly_value,
        'monthly_net_benefit': roi_monthly,
        'initial_investment_increase': initial_cost_difference,
        'payback_months': payback_months,
        'recommended': payback_months < 12  # ROI within 1 year
    }

Step 3: Decision Matrix

Use this matrix to guide initial decisions:

Factor Favors Real-Time Favors Batch
Latency Requirement < 5 minutes > 1 hour acceptable
Data Volume Low-medium (< 100K events/sec) Very high volume
Processing Complexity Simple transformations Complex joins, ML
Historical Context Minimal lookups Heavy historical joins
Ordering Requirements Strict ordering needed Order not critical
Exactly-Once Semantics Required Best-effort acceptable
Error Recovery Immediate retry Rerun full batch
Team Experience Streaming expertise Batch expertise
Infrastructure Kubernetes, managed Kafka Existing Spark/warehouse
Budget Flexible Constrained

Implementation Patterns

Pattern: Streaming Ingestion, Batch Processing

The most common hybrid pattern: ingest via streaming for durability, process in batch for simplicity.

# hybrid_pattern.py
"""
Streaming ingestion with batch processing pattern.
Best of both worlds for many use cases.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from datetime import datetime, timedelta

class HybridPipeline:
    """
    Implements streaming ingestion with batch processing.

    Data flow:
    1. Kafka -> Bronze (streaming, append-only)
    2. Bronze -> Silver (batch, hourly)
    3. Silver -> Gold (batch, daily)
    """

    def __init__(self, spark: SparkSession, checkpoint_dir: str):
        self.spark = spark
        self.checkpoint_dir = checkpoint_dir

    def start_streaming_ingestion(self, kafka_servers: str, topic: str):
        """
        Streaming ingestion into Bronze layer.
        Runs continuously, appends raw events.
        """
        raw_stream = (
            self.spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", kafka_servers)
            .option("subscribe", topic)
            .option("startingOffsets", "latest")
            .load()
        )

        # Parse and add metadata
        parsed_stream = (
            raw_stream
            .select(
                col("key").cast("string").alias("event_key"),
                col("value").cast("string").alias("event_payload"),
                col("topic"),
                col("partition"),
                col("offset"),
                col("timestamp").alias("kafka_timestamp"),
                current_timestamp().alias("ingested_at")
            )
        )

        # Write to Bronze layer (Delta Lake)
        query = (
            parsed_stream.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", f"{self.checkpoint_dir}/bronze")
            .option("mergeSchema", "true")
            .trigger(processingTime="10 seconds")  # Micro-batch every 10 seconds
            .start("s3://datalake/bronze/events")
        )

        return query

    def run_silver_batch(self, processing_hour: datetime):
        """
        Batch processing from Bronze to Silver.
        Runs hourly, processes previous hour's data.
        """
        hour_start = processing_hour.replace(minute=0, second=0, microsecond=0)
        hour_end = hour_start + timedelta(hours=1)

        # Read new Bronze data
        bronze_df = (
            self.spark.read.format("delta")
            .load("s3://datalake/bronze/events")
            .filter(
                (col("ingested_at") >= hour_start) &
                (col("ingested_at") < hour_end)
            )
        )

        # Parse JSON payload
        from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

        event_schema = StructType([
            StructField("event_id", StringType()),
            StructField("user_id", StringType()),
            StructField("event_type", StringType()),
            StructField("amount", DoubleType()),
            StructField("timestamp", TimestampType()),
        ])

        silver_df = (
            bronze_df
            .withColumn("parsed", from_json(col("event_payload"), event_schema))
            .select(
                col("parsed.event_id").alias("event_id"),
                col("parsed.user_id").alias("user_id"),
                col("parsed.event_type").alias("event_type"),
                col("parsed.amount").alias("amount"),
                col("parsed.timestamp").alias("event_timestamp"),
                col("ingested_at"),
                date_format(col("parsed.timestamp"), "yyyy-MM-dd").alias("event_date"),
                hour(col("parsed.timestamp")).alias("event_hour"),
            )
            .filter(col("event_id").isNotNull())  # Data quality filter
            .dropDuplicates(["event_id"])  # Deduplication
        )

        # Merge into Silver layer (upsert for idempotency)
        if DeltaTable.isDeltaTable(self.spark, "s3://datalake/silver/events"):
            silver_table = DeltaTable.forPath(self.spark, "s3://datalake/silver/events")

            (
                silver_table.alias("target")
                .merge(
                    silver_df.alias("source"),
                    "target.event_id = source.event_id"
                )
                .whenNotMatchedInsertAll()
                .execute()
            )
        else:
            # First run - create table
            (
                silver_df.write
                .format("delta")
                .partitionBy("event_date")
                .mode("overwrite")
                .save("s3://datalake/silver/events")
            )

        return silver_df.count()

    def run_gold_aggregation(self, processing_date: datetime):
        """
        Daily aggregation from Silver to Gold.
        Creates business-level metrics.
        """
        target_date = processing_date.date()

        # Read Silver data for the day
        silver_df = (
            self.spark.read.format("delta")
            .load("s3://datalake/silver/events")
            .filter(col("event_date") == str(target_date))
        )

        # Aggregate to Gold metrics
        daily_metrics = (
            silver_df
            .groupBy("event_date", "event_type")
            .agg(
                countDistinct("user_id").alias("unique_users"),
                count("event_id").alias("event_count"),
                sum("amount").alias("total_amount"),
                avg("amount").alias("avg_amount"),
                min("event_timestamp").alias("first_event"),
                max("event_timestamp").alias("last_event"),
            )
        )

        # User-level daily summary
        user_daily = (
            silver_df
            .groupBy("event_date", "user_id")
            .agg(
                count("event_id").alias("events"),
                sum("amount").alias("daily_spend"),
                collect_set("event_type").alias("event_types"),
            )
        )

        # Write to Gold layer
        (
            daily_metrics.write
            .format("delta")
            .mode("overwrite")
            .option("replaceWhere", f"event_date = '{target_date}'")
            .save("s3://datalake/gold/daily_metrics")
        )

        (
            user_daily.write
            .format("delta")
            .mode("overwrite")
            .option("replaceWhere", f"event_date = '{target_date}'")
            .save("s3://datalake/gold/user_daily")
        )

        return daily_metrics.count()

Pattern: Streaming with Materialized Views

For near-real-time dashboards without full streaming complexity.

# materialized_views.py
"""
Materialized views pattern for near-real-time analytics.
Simpler than full streaming, faster than batch.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import schedule
import time

class MaterializedViewManager:
    """
    Manages materialized views with configurable refresh intervals.
    Each view can have different freshness requirements.
    """

    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.views = {}

    def register_view(
        self,
        name: str,
        source_table: str,
        aggregation_query: str,
        refresh_interval_minutes: int,
        partition_column: str = None
    ):
        """Register a materialized view with its refresh configuration."""
        self.views[name] = {
            'source_table': source_table,
            'query': aggregation_query,
            'refresh_interval': refresh_interval_minutes,
            'partition_column': partition_column,
            'last_refreshed': None,
        }

    def refresh_view(self, name: str):
        """Refresh a specific materialized view."""
        view_config = self.views[name]

        # Execute aggregation query
        result_df = self.spark.sql(view_config['query'])

        # Write to materialized view location
        target_path = f"s3://datalake/materialized_views/{name}"

        if view_config['partition_column']:
            (
                result_df.write
                .format("delta")
                .mode("overwrite")
                .partitionBy(view_config['partition_column'])
                .save(target_path)
            )
        else:
            (
                result_df.write
                .format("delta")
                .mode("overwrite")
                .save(target_path)
            )

        view_config['last_refreshed'] = datetime.now()

        return result_df.count()

    def start_refresh_scheduler(self):
        """Start background scheduler for view refreshes."""
        for name, config in self.views.items():
            interval = config['refresh_interval']

            # Schedule refresh
            schedule.every(interval).minutes.do(
                self.refresh_view, name=name
            )

        # Run scheduler loop
        while True:
            schedule.run_pending()
            time.sleep(10)


# Example usage
def setup_dashboard_views(manager: MaterializedViewManager):
    """Set up materialized views for a dashboard."""

    # Real-time KPIs - refresh every 1 minute
    manager.register_view(
        name="realtime_kpis",
        source_table="silver.events",
        aggregation_query="""
            SELECT
                current_timestamp() as snapshot_time,
                count(*) as events_last_hour,
                count(distinct user_id) as active_users,
                sum(amount) as revenue
            FROM silver.events
            WHERE event_timestamp > current_timestamp() - INTERVAL 1 HOUR
        """,
        refresh_interval_minutes=1,
    )

    # Hourly trends - refresh every 5 minutes
    manager.register_view(
        name="hourly_trends",
        source_table="silver.events",
        aggregation_query="""
            SELECT
                date_trunc('hour', event_timestamp) as hour,
                event_type,
                count(*) as events,
                sum(amount) as revenue,
                count(distinct user_id) as users
            FROM silver.events
            WHERE event_timestamp > current_timestamp() - INTERVAL 24 HOURS
            GROUP BY 1, 2
        """,
        refresh_interval_minutes=5,
        partition_column="hour",
    )

    # Daily summary - refresh every 15 minutes
    manager.register_view(
        name="daily_summary",
        source_table="silver.events",
        aggregation_query="""
            SELECT
                date(event_timestamp) as date,
                count(*) as total_events,
                sum(amount) as total_revenue,
                count(distinct user_id) as total_users,
                sum(amount) / count(distinct user_id) as revenue_per_user
            FROM silver.events
            WHERE event_timestamp > current_timestamp() - INTERVAL 30 DAYS
            GROUP BY 1
        """,
        refresh_interval_minutes=15,
        partition_column="date",
    )

Migration Strategies

From Batch to Streaming

┌──────────────────────────────────────────────────────────────────────────────┐
│                    BATCH TO STREAMING MIGRATION                              │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Phase 1: Parallel Run                                                       │
│  ─────────────────────                                                       │
│  ┌──────────────┐                      ┌──────────────┐                     │
│  │   Existing   │──────────────────────│   Target     │                     │
│  │   Batch      │                      │   Table      │                     │
│  │   Pipeline   │                      │              │                     │
│  └──────────────┘                      └──────────────┘                     │
│         │                                     ▲                              │
│         │              ┌──────────────┐       │                              │
│         └─────────────►│   Compare    │───────┘                              │
│                        │   Results    │                                      │
│  ┌──────────────┐      └──────────────┘                                      │
│  │    New       │──────────┘                                                 │
│  │  Streaming   │                                                            │
│  │  Pipeline    │   (Shadow mode - results compared but not used)            │
│  └──────────────┘                                                            │
│                                                                              │
│  Phase 2: Canary                                                             │
│  ───────────────                                                             │
│  • Route 10% of traffic to streaming results                                 │
│  • Monitor for discrepancies                                                 │
│  • Gradually increase percentage                                             │
│                                                                              │
│  Phase 3: Cutover                                                            │
│  ────────────────                                                            │
│  • Streaming becomes primary                                                 │
│  • Batch runs as backup/validation                                           │
│  • Eventually decommission batch                                             │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Key Takeaways

  1. Real-time isn't always better: It's a trade-off. Higher complexity and cost must be justified by business value.

  2. Start with the business question: What decision changes based on fresher data? What's the cost of delay?

  3. Hybrid approaches win: Stream into a lakehouse, materialize views at different latencies based on consumer needs.

  4. Streaming ingestion, batch processing: Often the best of both worlds. Durable streaming capture, simple batch logic.

  5. Cost the full picture: Include engineering time, on-call burden, and operational complexity—not just compute.

  6. Plan for evolution: Start with batch, add streaming for specific use cases. Don't over-engineer from day one.

References

  1. Marz, N., & Warren, J. (2015). Big Data: Principles and Best Practices of Scalable Real-time Data Systems. Manning.
  2. Kreps, J. (2014). "Questioning the Lambda Architecture." O'Reilly Radar.
  3. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  4. Narkhede, N., Shapira, G., & Palino, T. (2017). Kafka: The Definitive Guide. O'Reilly Media.
  5. Databricks Documentation: Delta Lake Streaming
  6. Apache Flink Documentation: DataStream API

Key Takeaways

  • Real-time processing isn't always better—it's a trade-off between latency, cost, and complexity
  • Most organizations benefit from a hybrid approach: real-time for critical paths, batch for the rest
  • The 'speed layer' and 'batch layer' concept from Lambda architecture remains relevant even in modern stacks
  • Streaming-first architectures (Kappa) simplify operations but require careful capacity planning
  • Cost analysis should include operational complexity, not just compute resources
Gemut Analytics Team
Gemut Analytics Team
Data Engineering Experts