Real-time Financial Analytics & Fraud Detection Platform
Designed and implemented a mission-critical streaming analytics platform processing 2M+ transactions per second with sub-50ms fraud detection latency for a global financial institution.

Executive Summary
Built a real-time streaming analytics platform capable of processing over 2 million financial transactions per second while maintaining sub-50ms latency for fraud detection decisions. The system achieved 99.999% availability (five nines), reduced fraud losses by 40%, decreased false positive rates by 75%, and ensured compliance with GDPR, PSD2, and MiFID II regulations across 27 EU countries.
The Challenge
The client, a top-10 global financial institution processing $4.2 trillion annually, was operating a legacy batch-based fraud detection system that could only analyze transactions with a 4-hour delay. This latency window allowed sophisticated fraud schemes to succeed before detection, resulting in estimated annual losses exceeding $180M. Additionally, the high false positive rate (8.3%) was blocking legitimate transactions, causing customer friction and an estimated $45M in lost revenue from abandoned transactions.
The Challenge
Business Context
The client, a top-10 global financial institution with operations in 47 countries and over $4.2 trillion in annual transaction volume, faced a critical challenge: their legacy fraud detection system operated on a 4-hour batch cycle, creating a window of vulnerability that sophisticated fraudsters exploited.
The cost of latency:
| Metric | Legacy System | Business Impact |
|---|---|---|
| Detection Latency | 4+ hours | $180M annual fraud losses |
| False Positive Rate | 8.3% | $45M lost revenue from blocked legitimate transactions |
| System Availability | 99.9% | 8.7 hours downtime/year during trading hours |
| Peak Throughput | 200K TPS | Unable to handle market volatility events |
The 2023 market volatility event was the catalyst: during a 3-hour period of extreme trading activity, the legacy system fell behind by 6+ hours, and fraud losses for that single day exceeded $12M.
Technical Requirements
The new platform needed to meet stringent requirements across performance, reliability, and compliance:
Performance Requirements:
├─ Throughput: 2M+ TPS sustained, 10M+ TPS burst
├─ Latency: < 50ms p99 for fraud decisions
├─ Processing: Exactly-once semantics
└─ Backpressure: Graceful degradation under load
Reliability Requirements:
├─ Availability: 99.999% (< 5.26 min/year downtime)
├─ RPO: 0 (zero data loss)
├─ RTO: < 30 seconds
└─ Multi-region: Active-active across 3 regions
Compliance Requirements:
├─ GDPR: Data subject rights, consent management
├─ PSD2: Strong customer authentication, open banking APIs
├─ MiFID II: Transaction reporting, best execution
├─ SOX: Audit trails, data integrity
└─ PCI-DSS: Cardholder data protection
Solution Architecture
High-Level Architecture
We designed a multi-layer streaming architecture optimized for both throughput and latency:
┌─────────────────────────────────────────────────────────────────────────────┐
│ REAL-TIME FINANCIAL ANALYTICS PLATFORM │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TRANSACTION SOURCES │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Payment │ │ Card │ │ Trading │ │ Mobile │ │ Wire │ │
│ │ Gateway │ │ Network │ │ Systems │ │ Apps │ │Transfers│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │ │
│ └───────────┴───────────┴───────────┴───────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ INGESTION LAYER (Kafka) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │transactions │ │ enrichment │ │ alerts │ 3 Clusters │ │
│ │ │(500 parts) │ │ (100 pts) │ │ (50 pts) │ 15 brokers each │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ 2M msg/s each │ │
│ │ │ │
│ │ Replication: 3 | acks=all | min.isr=2 | compression=lz4 │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ STREAM PROCESSING LAYER (Flink) │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Transaction Processor │ │ │
│ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │
│ │ │ │Enrichment │─▶│ Feature │─▶│ ML Fraud │─▶│ Decision │ │ │ │
│ │ │ │ (5ms) │ │Extraction │ │ Detection │ │ Engine │ │ │ │
│ │ │ └───────────┘ │ (8ms) │ │ (25ms) │ │ (10ms) │ │ │ │
│ │ │ └───────────┘ └───────────┘ └───────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Parallelism: 500 | Checkpoints: 30s | State Backend: RocksDB │ │
│ │ Memory: 4GB/slot | TaskManagers: 125 | Total slots: 500 │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ FEATURE STORE │ │ ML SERVING │ │ TIME-SERIES │ │
│ │ (Redis) │ │ (TensorFlow) │ │ (Cassandra) │ │
│ │ │ │ │ │ │ │
│ │ Hot features │ │ Fraud models │ │ Transaction │ │
│ │ < 1ms lookup │ │ < 15ms p99 │ │ history │ │
│ │ 100M keys │ │ GPU inference │ │ 5 years data │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ SERVING LAYER │ │
│ │ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │Real-time │ │ Alerting │ │Regulatory │ │ Analytics │ │ │
│ │ │Dashboard │ │ System │ │ Reporting │ │ APIs │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Kafka Configuration
We deployed a 3-cluster Kafka setup across 3 AWS regions for high availability:
// Producer configuration optimized for low latency with high reliability
public class TransactionProducerConfig {
public Properties getProducerProperties() {
Properties props = new Properties();
// Bootstrap servers (multi-region)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-us-east-1:9092,kafka-eu-west-1:9092,kafka-ap-southeast-1:9092");
// Reliability: exactly-once semantics
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-producer-" + UUID.randomUUID());
// Latency optimization
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // Max 5ms batching delay
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB batches
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB buffer
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Compression for throughput
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Retry configuration
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
// Serialization
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
TransactionAvroSerializer.class.getName());
return props;
}
}
// Custom partitioner for account-based routing
public class AccountBasedPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
Transaction txn = (Transaction) value;
String accountId = txn.getAccountId();
// Consistent hashing ensures all transactions for an account
// go to same partition (enables stateful fraud detection)
int numPartitions = cluster.partitionCountForTopic(topic);
return Math.abs(MurmurHash3.hash32(accountId.getBytes())) % numPartitions;
}
}
Flink Stream Processing
The core fraud detection pipeline was implemented in Apache Flink with exactly-once guarantees:
public class FraudDetectionPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint configuration for exactly-once
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// State backend for large state
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/fraud-detection");
// Parallelism
env.setParallelism(500);
// Kafka source with exactly-once
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("kafka:9092")
.setTopics("transactions")
.setGroupId("fraud-detection")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new TransactionAvroDeserializer())
.build();
DataStream<Transaction> transactions = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Kafka Source"
);
// Enrichment with account data
DataStream<EnrichedTransaction> enriched = transactions
.keyBy(Transaction::getAccountId)
.process(new AccountEnrichmentFunction())
.name("Account Enrichment");
// Feature extraction (parallel with enrichment)
DataStream<TransactionFeatures> features = enriched
.keyBy(EnrichedTransaction::getAccountId)
.process(new FeatureExtractionFunction())
.name("Feature Extraction");
// ML fraud scoring
DataStream<ScoredTransaction> scored = features
.process(new MLScoringFunction())
.name("ML Fraud Scoring");
// Decision engine with rules + ML
DataStream<FraudDecision> decisions = scored
.keyBy(ScoredTransaction::getAccountId)
.process(new DecisionEngineFunction())
.name("Decision Engine");
// Output: Approved transactions
decisions
.filter(d -> d.getDecision() == Decision.APPROVED)
.sinkTo(KafkaSink.<FraudDecision>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(new FraudDecisionSerializer("approved-transactions"))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build());
// Output: Blocked/flagged transactions
decisions
.filter(d -> d.getDecision() != Decision.APPROVED)
.sinkTo(KafkaSink.<FraudDecision>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(new FraudDecisionSerializer("flagged-transactions"))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build());
// Output: Real-time metrics to Cassandra
decisions
.addSink(new CassandraSink<>(decisions))
.name("Cassandra Sink");
env.execute("Fraud Detection Pipeline");
}
}
// Feature extraction with sliding windows
public class FeatureExtractionFunction
extends KeyedProcessFunction<String, EnrichedTransaction, TransactionFeatures> {
// State for rolling window calculations
private MapState<Long, Double> hourlyAmounts;
private MapState<Long, Integer> hourlyCounts;
private ValueState<AccountProfile> profileState;
@Override
public void open(Configuration parameters) {
hourlyAmounts = getRuntimeContext().getMapState(
new MapStateDescriptor<>("hourlyAmounts", Long.class, Double.class));
hourlyCounts = getRuntimeContext().getMapState(
new MapStateDescriptor<>("hourlyCounts", Long.class, Integer.class));
profileState = getRuntimeContext().getState(
new ValueStateDescriptor<>("profile", AccountProfile.class));
}
@Override
public void processElement(EnrichedTransaction txn, Context ctx,
Collector<TransactionFeatures> out) throws Exception {
long currentHour = txn.getTimestamp() / 3600000;
// Update rolling statistics
Double hourAmount = hourlyAmounts.get(currentHour);
hourlyAmounts.put(currentHour, (hourAmount == null ? 0 : hourAmount) + txn.getAmount());
Integer hourCount = hourlyCounts.get(currentHour);
hourlyCounts.put(currentHour, (hourCount == null ? 0 : hourCount) + 1);
// Clean old state (keep 168 hours = 7 days)
cleanOldState(currentHour - 168);
// Extract features
TransactionFeatures features = TransactionFeatures.builder()
.transactionId(txn.getTransactionId())
.accountId(txn.getAccountId())
.amount(txn.getAmount())
.timestamp(txn.getTimestamp())
// Transaction-level features
.isInternational(txn.isInternational())
.merchantCategory(txn.getMerchantCategory())
.channelType(txn.getChannelType())
.deviceFingerprint(txn.getDeviceFingerprint())
// Account-level features (from state)
.accountAge(calculateAccountAge(profileState.value()))
.avgTransactionAmount(calculateAvgAmount())
.transactionVelocity(calculateVelocity(currentHour))
.hourlyAmountDeviation(calculateAmountDeviation(txn.getAmount()))
// Behavioral features
.isNewMerchant(isNewMerchant(txn, profileState.value()))
.isNewCountry(isNewCountry(txn, profileState.value()))
.timeSinceLastTransaction(calculateTimeSinceLast(profileState.value()))
.build();
// Update profile
updateProfile(txn);
out.collect(features);
}
}
ML Model Serving
We deployed TensorFlow Serving with GPU acceleration for fraud scoring:
# ml/fraud_model.py
import tensorflow as tf
from tensorflow import keras
from typing import List, Dict
import numpy as np
class FraudDetectionModel:
"""
Deep learning model for real-time fraud detection.
Architecture: Transformer encoder + feedforward classifier
"""
def __init__(self, model_path: str):
self.model = tf.saved_model.load(model_path)
self.feature_columns = self._load_feature_config()
def score_batch(self, transactions: List[Dict]) -> List[float]:
"""
Score a batch of transactions.
Optimized for throughput: batch sizes of 32-64 transactions.
Args:
transactions: List of transaction feature dictionaries
Returns:
List of fraud probabilities (0.0 - 1.0)
"""
# Prepare feature tensor
features = self._preprocess_features(transactions)
# Run inference
with tf.device('/GPU:0'):
predictions = self.model.signatures['serving_default'](
features=features
)
return predictions['fraud_probability'].numpy().tolist()
def _preprocess_features(self, transactions: List[Dict]) -> tf.Tensor:
"""Convert transactions to model input tensor."""
# Numerical features
numerical = np.array([
[
txn['amount'],
txn['account_age'],
txn['avg_transaction_amount'],
txn['transaction_velocity'],
txn['hourly_amount_deviation'],
txn['time_since_last_transaction']
]
for txn in transactions
], dtype=np.float32)
# Normalize
numerical = (numerical - self.feature_means) / self.feature_stds
# Categorical features (embeddings handled by model)
categorical = {
'merchant_category': [txn['merchant_category'] for txn in transactions],
'channel_type': [txn['channel_type'] for txn in transactions],
'country': [txn['country'] for txn in transactions],
}
return {
'numerical_features': tf.constant(numerical),
**{k: tf.constant(v) for k, v in categorical.items()}
}
# Serving configuration
"""
# TensorFlow Serving model config
model_config_list {
config {
name: "fraud_detection"
base_path: "s3://ml-models/fraud-detection"
model_platform: "tensorflow"
model_version_policy {
specific {
versions: 12
versions: 13 # Canary deployment
}
}
version_labels {
key: "stable"
value: 12
}
version_labels {
key: "canary"
value: 13
}
}
}
"""
Decision Engine
public class DecisionEngineFunction
extends KeyedProcessFunction<String, ScoredTransaction, FraudDecision> {
private static final double HIGH_RISK_THRESHOLD = 0.85;
private static final double MEDIUM_RISK_THRESHOLD = 0.65;
private static final double LOW_RISK_THRESHOLD = 0.30;
// State for velocity limits
private MapState<String, Integer> velocityCounters;
private ValueState<Double> dailySpendState;
@Override
public void processElement(ScoredTransaction txn, Context ctx,
Collector<FraudDecision> out) throws Exception {
FraudDecision.Builder decision = FraudDecision.builder()
.transactionId(txn.getTransactionId())
.accountId(txn.getAccountId())
.mlScore(txn.getFraudScore())
.timestamp(System.currentTimeMillis());
// Rule-based checks (hard blocks)
List<String> ruleViolations = checkRules(txn);
if (!ruleViolations.isEmpty()) {
decision
.decision(Decision.BLOCKED)
.reason("RULE_VIOLATION")
.ruleViolations(ruleViolations);
out.collect(decision.build());
return;
}
// ML-based scoring
if (txn.getFraudScore() >= HIGH_RISK_THRESHOLD) {
decision
.decision(Decision.BLOCKED)
.reason("HIGH_FRAUD_SCORE");
} else if (txn.getFraudScore() >= MEDIUM_RISK_THRESHOLD) {
decision
.decision(Decision.REVIEW)
.reason("MEDIUM_FRAUD_SCORE");
} else if (txn.getFraudScore() >= LOW_RISK_THRESHOLD) {
// Additional verification for borderline cases
if (requiresAdditionalVerification(txn)) {
decision
.decision(Decision.STEP_UP_AUTH)
.reason("ADDITIONAL_VERIFICATION_REQUIRED");
} else {
decision
.decision(Decision.APPROVED)
.reason("LOW_RISK");
}
} else {
decision
.decision(Decision.APPROVED)
.reason("VERY_LOW_RISK");
}
out.collect(decision.build());
// Update velocity counters
updateVelocityCounters(txn);
}
private List<String> checkRules(ScoredTransaction txn) {
List<String> violations = new ArrayList<>();
// Rule 1: Velocity check (> 10 transactions in 5 minutes)
Integer recentCount = velocityCounters.get("5min_" + txn.getAccountId());
if (recentCount != null && recentCount > 10) {
violations.add("VELOCITY_EXCEEDED");
}
// Rule 2: Daily limit check
Double dailySpend = dailySpendState.value();
if (dailySpend != null && dailySpend + txn.getAmount() > txn.getDailyLimit()) {
violations.add("DAILY_LIMIT_EXCEEDED");
}
// Rule 3: Blacklisted merchant
if (isBlacklistedMerchant(txn.getMerchantId())) {
violations.add("BLACKLISTED_MERCHANT");
}
// Rule 4: Sanctions check
if (isSanctionedCountry(txn.getCountry())) {
violations.add("SANCTIONED_COUNTRY");
}
return violations;
}
}
Results & Impact
Performance Metrics
| Metric | Target | Achieved | Notes |
|---|---|---|---|
| Throughput | 2M TPS | 2.4M TPS | 20% headroom for growth |
| p50 Latency | < 30ms | 18ms | End-to-end decision time |
| p99 Latency | < 100ms | 47ms | Even during peak load |
| Availability | 99.999% | 99.9993% | 3.6 minutes downtime in year 1 |
| Fraud Detection Rate | > 95% | 97.2% | Known fraud patterns |
| False Positive Rate | < 3% | 2.1% | 75% reduction from legacy |
Business Impact
FINANCIAL IMPACT
┌───────────────────────────────────────────────────────────────────┐
│ │
│ FRAUD REDUCTION REVENUE RECOVERY │
│ ───────────────── ──────────────── │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Before: $180M/year │ │ Before: $45M lost │ │
│ │ After: $108M/year │ │ After: < $5M lost │ │
│ │ │ │ │ │
│ │ Savings: $72M/year │ │ Recovery: $40M/year │ │
│ │ (40% reduction) │ │ (89% improvement) │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ TOTAL ANNUAL BENEFIT: $112M │
│ IMPLEMENTATION COST: $18M │
│ ROI: 522% | PAYBACK: 2 MONTHS │
│ │
└───────────────────────────────────────────────────────────────────┘
Compliance Achievements
- GDPR: Full compliance with data subject rights, consent management, and data portability
- PSD2: Strong customer authentication (SCA) integrated with decision engine
- MiFID II: Real-time transaction reporting to regulators within 15 minutes
- SOX: Complete audit trail with tamper-evident logging
Key Learnings
What Worked Well
-
Account-Based Kafka Partitioning
- Enabled stateful fraud detection without distributed state lookups
- Maintained ordering guarantees per account
- Simplified exactly-once processing
-
Hybrid ML + Rules Approach
- Rules caught known fraud patterns with 100% accuracy
- ML detected novel patterns that rules missed
- Combined approach achieved best precision-recall balance
-
Feature Store Architecture
- Redis for hot features (< 1ms lookup)
- Flink state for real-time features
- Cassandra for historical features
- Clear SLA per feature tier
Challenges Overcome
-
Checkpoint Latency Spikes
- Initial checkpoints caused 200ms+ latency spikes
- Solution: Incremental checkpoints + async snapshot + dedicated checkpoint network
-
ML Model Serving Latency
- Individual inference too slow (50ms per transaction)
- Solution: Dynamic micro-batching (8-64 transactions based on queue depth)
-
State Growth
- Account state growing unbounded over time
- Solution: TTL-based state cleanup + tiered storage (hot/warm/cold)
Technologies Used
| Category | Technology | Purpose |
|---|---|---|
| Streaming | Apache Kafka (MSK) | Event backbone, exactly-once delivery |
| Processing | Apache Flink | Stream processing, stateful computation |
| ML Serving | TensorFlow Serving | GPU-accelerated fraud scoring |
| Feature Store | Redis Cluster | Hot feature storage |
| Time-Series | Apache Cassandra | Transaction history, analytics |
| Orchestration | Kubernetes (EKS) | Container orchestration |
| Monitoring | Prometheus + Grafana | Metrics and alerting |
| Tracing | Jaeger | Distributed tracing |
| Infrastructure | Terraform + AWS | Infrastructure as code |
Measurable Impact
2.4M/sec
Transaction Throughput
Peak sustained throughput< 50ms
Detection Latency
p99 fraud decision time99.999%
System Availability
Five nines achieved40%
Fraud Reduction
$72M annual savings-75%
False Positives
From 8.3% to 2.1%-60%
Processing Latency
End-to-end improvementReturn on Investment
$117M annual benefit ($72M fraud reduction + $45M recovered revenue from reduced false positives) against $18M implementation cost. Payback period: 2 months.