Building Scalable Data Infrastructure for African Fintech: From Mobile Money to Real-Time Fraud Detection
A technical guide to building data infrastructure for Africa's booming fintech sector, covering event-driven architectures for mobile payments, real-time fraud detection pipelines, and regulatory compliance for CBN and NDPR.
Africa's fintech revolution — led by Nigerian companies like Paystack, Flutterwave, Opay, and Moniepoint — processes millions of daily transactions across fragmented channels including bank transfers, USSD, POS terminals, cards, and mobile wallets. This guide explores the data engineering patterns needed to build scalable infrastructure for multi-channel payment processing, real-time fraud detection, CBN regulatory reporting, and credit scoring for underbanked populations.
- Familiarity with distributed systems and streaming architectures
- Basic understanding of payment processing flows
- Experience with Python, Java, or Scala

Introduction
Africa's fintech sector is experiencing unprecedented growth. Nigeria alone accounts for over $3 billion in annual digital payment volume, with companies like Paystack (acquired by Stripe for $200M), Flutterwave (valued at $3B), Opay (processing 3M+ daily transactions), and Moniepoint (serving 600K+ merchants) reshaping how 200 million Nigerians interact with financial services.
But this growth creates a formidable data engineering challenge. Unlike mature markets where card payments dominate a single, well-instrumented channel, African fintech must process transactions across five or more fragmented channels simultaneously:
- Bank transfers via NIBSS Instant Payment (NIP) — averaging 50M+ daily transactions
- USSD sessions — the primary interface for the 60%+ of Nigerians without smartphones
- POS terminals — 1M+ active terminals processing card and NFC payments
- Mobile wallets — Opay, PalmPay, and bank-issued wallets with proprietary APIs
- Card payments — Visa, Mastercard, and Verve (Nigeria's domestic card scheme)
Each channel generates events in different formats, at different velocities, with different reliability guarantees. Layer on the Central Bank of Nigeria's (CBN) increasingly stringent reporting requirements and the Nigeria Data Protection Regulation (NDPR), and you have a data infrastructure problem that demands careful architectural thinking.
This guide provides a technical blueprint for building the data infrastructure that African fintech companies need — from event ingestion to fraud detection to regulatory compliance — with patterns that are globally applicable but grounded in the specific realities of the Nigerian market.
The Scale of the Challenge
Before diving into architecture, let's quantify what "African fintech scale" actually means for data infrastructure:
Transaction Volume Benchmarks
| Metric | Nigeria (2024) | Context |
|---|---|---|
| Daily NIBSS NIP transactions | 50M+ | Bank-to-bank instant transfers |
| Daily POS transactions | 15M+ | Card and contactless payments |
| Monthly USSD sessions | 2B+ | Feature phone banking |
| Active mobile money wallets | 25M+ | Digital wallet accounts |
| Daily fraud attempts (estimated) | 500K+ | Across all channels |
Data Characteristics That Drive Architecture Decisions
African fintech data has several properties that distinguish it from payments data in mature markets:
- Extreme channel heterogeneity: A single customer might pay via USSD at a market stall, receive money via bank transfer, and check their balance on a mobile app — all within the same hour
- Variable payload sizes: USSD events are tiny (< 500 bytes) while POS settlement batches can be several megabytes
- Unreliable network conditions: Mobile network operators experience intermittent connectivity, causing delayed and out-of-order events
- High fraud velocity: Fraud patterns evolve within hours, not weeks — SIM swap fraud, agent collusion, and social engineering attacks are prevalent
- Regulatory timestamps: CBN requires nanosecond-precision timestamps for certain transaction categories, with mandatory 7-year retention
Nigerian Fintech Data Flow — Volume Estimates
Channel Events/sec Avg Size Daily Volume Latency SLA
─────────────────────────────────────────────────────────────────────
Bank Transfer 600 2 KB ~100 GB < 500ms
USSD 1,200 0.5 KB ~50 GB < 2s
POS Terminal 400 3 KB ~100 GB < 1s
Mobile Wallet 800 1.5 KB ~100 GB < 500ms
Card Payment 300 2.5 KB ~65 GB < 500ms
─────────────────────────────────────────────────────────────────────
TOTAL 3,300+ ~1.8 KB ~415 GB/day -
Peak multiplier: 3-5x during salary days (25th-30th monthly)
Annual growth rate: 40-60%
These numbers mean that a mid-size Nigerian fintech needs to ingest, process, and store roughly 150 TB per year of raw transaction data — and that volume is growing at 40-60% annually.
Event-Driven Architecture for Multi-Channel Payments
Why Event-Driven?
Traditional request-response architectures break down when processing payments across multiple channels with different latency profiles and reliability guarantees. An event-driven architecture provides:
- Channel decoupling: Each payment channel publishes events independently
- Temporal decoupling: Events can be processed asynchronously when real-time isn't required
- Replay capability: Events can be reprocessed for auditing, debugging, or backfilling
- Scalable consumption: Multiple downstream systems consume the same event stream independently
High-Level Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ MULTI-CHANNEL PAYMENT INGESTION │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ NIBSS │ │ USSD │ │ POS │ │ Mobile │ │ Card │ │
│ │ Gateway │ │ Gateway │ │ Gateway │ │ Wallet │ │ Gateway │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CHANNEL ADAPTERS (Normalize + Validate) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ NIP │ │ USSD │ │ ISO 8583│ │ Wallet │ │ Card │ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └───────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ APACHE KAFKA CLUSTER │ │
│ │ │ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────────┐ │ │
│ │ │ raw.payments │ │ raw.ussd │ │ raw.pos.settlement │ │ │
│ │ │ (12 partitions)│ │ (8 partitions) │ │ (6 partitions) │ │ │
│ │ └────────────────┘ └────────────────┘ └────────────────────────┘ │ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────────┐ │ │
│ │ │ normalized │ │ fraud.alerts │ │ cbn.reporting │ │ │
│ │ │ .transactions │ │ (3 partitions) │ │ (6 partitions) │ │ │
│ │ │ (24 partitions)│ └────────────────┘ └────────────────────┘ │ │
│ │ └────────────────┘ │ │
│ └───────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ FRAUD DETECTION │ │ CBN │ │ ANALYTICS / │ │
│ │ (Flink) │ │ REPORTING │ │ DATA WAREHOUSE │ │
│ └──────────────────┘ └────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Canonical Payment Event Schema
The first architectural decision is defining a canonical event format that normalizes across all channels. This is the backbone of the entire system:
# schemas/canonical_payment_event.py
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
class PaymentChannel(Enum):
BANK_TRANSFER = "bank_transfer" # NIBSS NIP
USSD = "ussd" # USSD gateway
POS = "pos" # POS terminal
MOBILE_WALLET = "mobile_wallet" # Opay, PalmPay, etc.
CARD = "card" # Visa, Mastercard, Verve
QR = "qr" # QR code payments
class TransactionStatus(Enum):
INITIATED = "initiated"
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
REVERSED = "reversed"
DISPUTED = "disputed"
class Currency(Enum):
NGN = "NGN"
USD = "USD"
GHS = "GHS"
KES = "KES"
@dataclass
class GeoLocation:
latitude: Optional[float] = None
longitude: Optional[float] = None
state: Optional[str] = None # Nigerian state (e.g., "Lagos", "Kano")
lga: Optional[str] = None # Local Government Area
country: str = "NG"
@dataclass
class DeviceInfo:
device_id: Optional[str] = None
device_type: Optional[str] = None # "pos_terminal", "mobile", "ussd"
terminal_id: Optional[str] = None # POS terminal ID
ip_address: Optional[str] = None
imei: Optional[str] = None # For mobile devices
network_operator: Optional[str] = None # MTN, Airtel, Glo, 9mobile
@dataclass
class CanonicalPaymentEvent:
"""
Canonical payment event that normalizes across all channels.
All channel adapters must produce events conforming to this schema.
"""
# Core identifiers
event_id: UUID = field(default_factory=uuid4)
transaction_ref: str = "" # Unique transaction reference
channel_ref: str = "" # Original channel reference
session_id: Optional[str] = None # USSD session ID or API request ID
# Channel and status
channel: PaymentChannel = PaymentChannel.BANK_TRANSFER
status: TransactionStatus = TransactionStatus.INITIATED
# Financial details
amount: Decimal = Decimal("0.00")
currency: Currency = Currency.NGN
fee: Decimal = Decimal("0.00")
vat: Decimal = Decimal("0.00") # 7.5% VAT on fees per FIRS
# Parties
sender_account: str = ""
sender_bank_code: str = "" # CBN bank code
sender_name: str = ""
sender_bvn_hash: str = "" # Hashed BVN for privacy
receiver_account: str = ""
receiver_bank_code: str = ""
receiver_name: str = ""
# Context
narration: str = ""
category: Optional[str] = None # "p2p", "merchant", "bill", "airtime"
merchant_id: Optional[str] = None
merchant_category_code: Optional[str] = None # ISO 18245 MCC
# Location and device
geo: GeoLocation = field(default_factory=GeoLocation)
device: DeviceInfo = field(default_factory=DeviceInfo)
# Timestamps (nanosecond precision for CBN compliance)
event_time: datetime = field(default_factory=datetime.utcnow)
channel_time: Optional[datetime] = None # When channel received it
processing_time: Optional[datetime] = None
settlement_time: Optional[datetime] = None
# Lineage
schema_version: str = "1.0.0"
source_system: str = ""
ingestion_time: datetime = field(default_factory=datetime.utcnow)
Channel Adapters
Each payment channel has a dedicated adapter that transforms channel-specific formats into the canonical schema. Here's the NIBSS NIP adapter as an example:
# adapters/nibss_nip_adapter.py
import hashlib
import json
from datetime import datetime
from decimal import Decimal
from typing import Optional
from uuid import uuid4
from confluent_kafka import Producer
from schemas.canonical_payment_event import (
CanonicalPaymentEvent,
Currency,
GeoLocation,
PaymentChannel,
TransactionStatus,
)
class NIBSSNIPAdapter:
"""
Adapter for NIBSS Instant Payment (NIP) messages.
NIP uses ISO 20022-like XML messages for interbank transfers.
"""
BANK_CODE_MAP = {
"000001": "Sterling Bank",
"000002": "Keystone Bank",
"000003": "FCMB",
"000004": "United Bank for Africa",
"000005": "Diamond/Access Bank",
"000006": "JAIZ Bank",
"000007": "Fidelity Bank",
"000008": "Polaris Bank",
"000009": "Citi Bank",
"000010": "Ecobank",
"000011": "Unity Bank",
"000012": "Stanbic IBTC",
"000013": "GTBank",
"000014": "Zenith Bank",
"000015": "First Bank",
"000016": "First City Monument Bank",
"000017": "Wema Bank",
"000018": "Union Bank",
}
def __init__(self, kafka_producer: Producer, output_topic: str):
self.producer = kafka_producer
self.output_topic = output_topic
def transform(self, nip_message: dict) -> CanonicalPaymentEvent:
"""Transform a NIP message into a canonical payment event."""
# Extract amount (NIP sends in kobo, convert to naira)
amount_kobo = int(nip_message.get("Amount", 0))
amount_naira = Decimal(amount_kobo) / Decimal(100)
# Hash BVN for privacy — never store raw BVN in analytics systems
sender_bvn = nip_message.get("BeneficiaryBVN", "")
bvn_hash = hashlib.sha256(
sender_bvn.encode() + b"fintech_salt_2024"
).hexdigest() if sender_bvn else ""
# Map NIP status codes to canonical status
nip_status = nip_message.get("ResponseCode", "")
status = self._map_status(nip_status)
# Calculate fee and VAT
fee = Decimal(nip_message.get("Fee", "0")) / Decimal(100)
vat = fee * Decimal("0.075") # 7.5% VAT per FIRS
event = CanonicalPaymentEvent(
event_id=uuid4(),
transaction_ref=nip_message.get("SessionID", ""),
channel_ref=nip_message.get("PaymentReference", ""),
channel=PaymentChannel.BANK_TRANSFER,
status=status,
amount=amount_naira,
currency=Currency.NGN,
fee=fee,
vat=vat,
sender_account=nip_message.get("OriginatorAccountNumber", ""),
sender_bank_code=nip_message.get("OriginatorBankCode", ""),
sender_name=nip_message.get("OriginatorName", ""),
sender_bvn_hash=bvn_hash,
receiver_account=nip_message.get("BeneficiaryAccountNumber", ""),
receiver_bank_code=nip_message.get("BeneficiaryBankCode", ""),
receiver_name=nip_message.get("BeneficiaryName", ""),
narration=nip_message.get("Narration", ""),
category=self._classify_transfer(nip_message),
event_time=datetime.fromisoformat(
nip_message.get("TransactionDate", datetime.utcnow().isoformat())
),
source_system="nibss_nip",
schema_version="1.0.0",
)
return event
def publish(self, event: CanonicalPaymentEvent) -> None:
"""Publish canonical event to Kafka."""
key = event.sender_account # Partition by sender for ordering
value = json.dumps(event.__dict__, default=str)
self.producer.produce(
topic=self.output_topic,
key=key.encode("utf-8"),
value=value.encode("utf-8"),
callback=self._delivery_callback,
)
self.producer.flush()
def _map_status(self, nip_code: str) -> TransactionStatus:
"""Map NIBSS NIP response codes to canonical status."""
status_map = {
"00": TransactionStatus.COMPLETED, # Approved
"01": TransactionStatus.PENDING, # Status unknown
"03": TransactionStatus.FAILED, # Invalid sender
"05": TransactionStatus.FAILED, # Do not honor
"06": TransactionStatus.FAILED, # Dormant account
"07": TransactionStatus.FAILED, # Invalid account
"12": TransactionStatus.FAILED, # Invalid transaction
"13": TransactionStatus.FAILED, # Invalid amount
"14": TransactionStatus.FAILED, # Invalid card number
"25": TransactionStatus.PROCESSING, # Unable to locate record
"26": TransactionStatus.REVERSED, # Duplicate record
"51": TransactionStatus.FAILED, # Insufficient funds
"57": TransactionStatus.FAILED, # Transaction not permitted
"58": TransactionStatus.FAILED, # Transaction not permitted
"61": TransactionStatus.FAILED, # Transfer limit exceeded
"63": TransactionStatus.FAILED, # Security violation
"65": TransactionStatus.FAILED, # Exceeds withdrawal frequency
"91": TransactionStatus.FAILED, # Issuer/switch inoperative
"96": TransactionStatus.FAILED, # System malfunction
}
return status_map.get(nip_code, TransactionStatus.FAILED)
def _classify_transfer(self, msg: dict) -> str:
"""Classify transfer type based on account patterns."""
receiver = msg.get("BeneficiaryAccountNumber", "")
narration = msg.get("Narration", "").lower()
if any(kw in narration for kw in ["airtime", "recharge", "mtn", "glo"]):
return "airtime"
elif any(kw in narration for kw in ["dstv", "gotv", "nepa", "ekedc", "ikedc"]):
return "bill"
elif receiver.startswith("00"): # Typical merchant prefixes
return "merchant"
return "p2p"
@staticmethod
def _delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}]")
Kafka Topic Design
Proper topic design is critical for performance, ordering guarantees, and downstream consumption. Here is the recommended topology for a Nigerian fintech platform:
# kafka/topics.yml — Kafka topic configuration
# Apply with: kafka-topics.sh --bootstrap-server kafka:9092 --create --config-file topics.yml
topics:
# ── Raw channel topics (one per channel) ──
- name: raw.payments.bank-transfer
partitions: 12
replication_factor: 3
config:
retention.ms: 604800000 # 7 days for raw events
retention.bytes: 107374182400 # 100 GB max per partition
compression.type: lz4
min.insync.replicas: 2
message.timestamp.type: CreateTime
max.message.bytes: 1048576 # 1 MB max message
- name: raw.payments.ussd
partitions: 8
replication_factor: 3
config:
retention.ms: 604800000
compression.type: lz4
min.insync.replicas: 2
- name: raw.payments.pos
partitions: 6
replication_factor: 3
config:
retention.ms: 604800000
compression.type: lz4
min.insync.replicas: 2
max.message.bytes: 5242880 # 5 MB for settlement batches
- name: raw.payments.mobile-wallet
partitions: 8
replication_factor: 3
config:
retention.ms: 604800000
compression.type: lz4
min.insync.replicas: 2
- name: raw.payments.card
partitions: 6
replication_factor: 3
config:
retention.ms: 604800000
compression.type: lz4
min.insync.replicas: 2
# ── Normalized canonical topic ──
- name: normalized.transactions
partitions: 24 # Higher parallelism for downstream
replication_factor: 3
config:
retention.ms: 2592000000 # 30 days
compression.type: zstd # Better ratio for normalized data
min.insync.replicas: 2
cleanup.policy: delete
# ── Fraud detection topics ──
- name: fraud.scoring-requests
partitions: 12
replication_factor: 3
config:
retention.ms: 86400000 # 1 day
compression.type: lz4
min.insync.replicas: 2
- name: fraud.alerts
partitions: 3
replication_factor: 3
config:
retention.ms: 2592000000 # 30 days (audit requirement)
compression.type: zstd
min.insync.replicas: 2
- name: fraud.blocked-transactions
partitions: 3
replication_factor: 3
config:
retention.ms: -1 # Infinite retention
compression.type: zstd
min.insync.replicas: 2
# ── Regulatory reporting topics ──
- name: cbn.regulatory-reports
partitions: 6
replication_factor: 3
config:
retention.ms: -1 # Infinite retention (7-year CBN req.)
compression.type: zstd
min.insync.replicas: 2
# ── Compacted topics for state ──
- name: state.account-balances
partitions: 12
replication_factor: 3
config:
cleanup.policy: compact
min.compaction.lag.ms: 60000
compression.type: lz4
min.insync.replicas: 2
- name: state.customer-profiles
partitions: 12
replication_factor: 3
config:
cleanup.policy: compact
compression.type: zstd
min.insync.replicas: 2
Partitioning Strategy
Choosing the right partition key is a critical decision that affects ordering, parallelism, and hot-spot distribution:
# kafka/partitioning.py
import hashlib
from typing import Optional
class FintechPartitioner:
"""
Custom partitioner for Nigerian fintech workloads.
Key design decisions:
1. Partition by sender_account for payment events (maintains per-account ordering)
2. Use consistent hashing to handle account number format variations
3. Special handling for high-volume merchant accounts to avoid hot partitions
"""
# High-volume merchant accounts that need spreading across partitions
HIGH_VOLUME_MERCHANTS = {
"MTN_AIRTIME", "GLO_AIRTIME", "AIRTEL_AIRTIME",
"DSTV_BILLS", "EKEDC_BILLS", "IKEDC_BILLS",
"LIRS_TAX", "FIRS_TAX",
}
def __init__(self, num_partitions: int):
self.num_partitions = num_partitions
def partition(
self,
sender_account: str,
merchant_id: Optional[str] = None,
transaction_ref: Optional[str] = None,
) -> int:
"""
Determine partition for a payment event.
For regular accounts: hash(sender_account) % num_partitions
For high-volume merchants: hash(transaction_ref) % num_partitions
(spreads load across all partitions, sacrificing per-sender ordering)
"""
if merchant_id and merchant_id in self.HIGH_VOLUME_MERCHANTS:
# Spread high-volume merchants across all partitions
key = transaction_ref or sender_account
else:
key = sender_account
# Consistent hashing using murmur2 (matches Kafka's default partitioner)
hash_bytes = hashlib.md5(key.encode("utf-8")).digest()
hash_value = int.from_bytes(hash_bytes[:4], byteorder="big")
return hash_value % self.num_partitions
Real-Time Fraud Detection Pipeline
Fraud detection is the highest-value and most technically demanding component of fintech data infrastructure. In Nigeria, fraud losses exceeded NGN 18 billion ($22M) in 2023, with attack vectors ranging from SIM swap fraud to merchant collusion.
Fraud Detection Architecture
┌──────────────────────────────────────────────────────────────────────┐
│ REAL-TIME FRAUD DETECTION PIPELINE │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ normalized.transactions (Kafka) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ APACHE FLINK FRAUD ENGINE │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │ │
│ │ │ RULE-BASED │ │ ML MODEL │ │ NETWORK │ │ │
│ │ │ ENGINE │ │ SCORING │ │ ANALYSIS │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • Velocity │ │ • XGBoost │ │ • Graph │ │ │
│ │ │ • Amount │ │ • Real-time │ │ queries │ │ │
│ │ │ • Geo rules │ │ features │ │ • Ring │ │ │
│ │ │ • Blacklist │ │ • Behavioral │ │ detect │ │ │
│ │ │ • Time-based │ │ profiling │ │ • Mule │ │ │
│ │ │ │ │ │ │ accounts │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └─────┬──────┘ │ │
│ │ │ │ │ │ │
│ │ └────────┬─────────┘──────────────────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ SCORE FUSION │ │ │
│ │ │ (Ensemble) │ │ │
│ │ └───────┬────────┘ │ │
│ │ │ │ │
│ └─────────────────┼────────────────────────────────────────┘ │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌───────┐ ┌──────────┐ │
│ │ BLOCK │ │ ALERT │ │ ALLOW │ │
│ │ score>0.9 │ │ 0.5- │ │ score< │ │
│ │ → reject │ │ 0.9 │ │ 0.5 → │ │
│ │ → freeze │ │ → OTP │ │ approve │ │
│ └────────────┘ └───────┘ └──────────┘ │
│ │
│ Latency budget: < 200ms end-to-end │
│ Target false positive rate: < 0.1% │
│ Target detection rate: > 95% │
│ │
└──────────────────────────────────────────────────────────────────────┘
Real-Time Feature Engineering with Flink
The fraud detection pipeline relies on computing features in real-time over sliding windows. Apache Flink is the preferred engine because it provides exactly-once semantics and sophisticated windowing:
# fraud/flink_feature_engine.py
"""
Real-time feature computation for fraud detection using PyFlink.
Computes per-account behavioral features over sliding windows.
"""
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import (
ValueStateDescriptor,
ListStateDescriptor,
MapStateDescriptor,
)
from pyflink.common.typeinfo import Types
from pyflink.datastream.window import SlidingEventTimeWindows, Time
import json
from datetime import datetime, timedelta
class TransactionFeatureExtractor(KeyedProcessFunction):
"""
Computes real-time features per account for fraud scoring.
Features computed:
- Transaction velocity (count in last 1min, 5min, 1hr, 24hr)
- Amount statistics (mean, std, max in last 24hr)
- Channel diversity (unique channels in last 1hr)
- Geographic velocity (distance between consecutive transactions)
- Time-of-day patterns (deviation from historical pattern)
- Recipient diversity (unique recipients in last 1hr)
"""
def open(self, runtime_context: RuntimeContext):
# State: recent transactions (sliding 24hr window)
self.recent_txns = runtime_context.get_list_state(
ListStateDescriptor(
"recent_transactions",
Types.STRING()
)
)
# State: account profile (long-term behavioral baseline)
self.account_profile = runtime_context.get_state(
ValueStateDescriptor(
"account_profile",
Types.STRING()
)
)
# State: known recipients
self.known_recipients = runtime_context.get_map_state(
MapStateDescriptor(
"known_recipients",
Types.STRING(),
Types.LONG()
)
)
def process_element(self, event_json: str, ctx: KeyedProcessFunction.Context):
event = json.loads(event_json)
now = datetime.fromisoformat(event["event_time"])
# Retrieve and clean recent transactions (remove > 24hr old)
recent = []
cutoff_24h = now - timedelta(hours=24)
for txn_json in self.recent_txns.get():
txn = json.loads(txn_json)
txn_time = datetime.fromisoformat(txn["event_time"])
if txn_time >= cutoff_24h:
recent.append(txn)
# ── Compute velocity features ──
cutoff_1m = now - timedelta(minutes=1)
cutoff_5m = now - timedelta(minutes=5)
cutoff_1h = now - timedelta(hours=1)
txn_count_1m = sum(
1 for t in recent
if datetime.fromisoformat(t["event_time"]) >= cutoff_1m
)
txn_count_5m = sum(
1 for t in recent
if datetime.fromisoformat(t["event_time"]) >= cutoff_5m
)
txn_count_1h = sum(
1 for t in recent
if datetime.fromisoformat(t["event_time"]) >= cutoff_1h
)
txn_count_24h = len(recent)
# ── Compute amount features ──
amounts = [float(t["amount"]) for t in recent]
current_amount = float(event["amount"])
avg_amount_24h = sum(amounts) / len(amounts) if amounts else 0
max_amount_24h = max(amounts) if amounts else 0
amount_zscore = (
(current_amount - avg_amount_24h) / (self._std(amounts) or 1)
if amounts else 0
)
# ── Compute channel diversity ──
recent_1h = [
t for t in recent
if datetime.fromisoformat(t["event_time"]) >= cutoff_1h
]
unique_channels_1h = len(set(t["channel"] for t in recent_1h))
# ── Compute recipient diversity ──
unique_recipients_1h = len(
set(t["receiver_account"] for t in recent_1h)
)
# ── Check if recipient is new ──
receiver = event.get("receiver_account", "")
is_new_recipient = not self.known_recipients.contains(receiver)
# ── Compute geographic velocity ──
geo_velocity = self._compute_geo_velocity(event, recent)
# ── Time pattern deviation ──
hour_of_day = now.hour
is_unusual_hour = hour_of_day < 5 or hour_of_day > 23
# ── Build feature vector ──
features = {
"event_id": event["event_id"],
"transaction_ref": event["transaction_ref"],
"account": event["sender_account"],
# Velocity features
"txn_count_1m": txn_count_1m,
"txn_count_5m": txn_count_5m,
"txn_count_1h": txn_count_1h,
"txn_count_24h": txn_count_24h,
# Amount features
"amount": current_amount,
"avg_amount_24h": round(avg_amount_24h, 2),
"max_amount_24h": max_amount_24h,
"amount_zscore": round(amount_zscore, 4),
"amount_to_max_ratio": (
current_amount / max_amount_24h if max_amount_24h > 0 else 0
),
# Channel features
"channel": event["channel"],
"unique_channels_1h": unique_channels_1h,
# Recipient features
"unique_recipients_1h": unique_recipients_1h,
"is_new_recipient": is_new_recipient,
# Geographic features
"geo_velocity_kmh": geo_velocity,
# Temporal features
"hour_of_day": hour_of_day,
"is_unusual_hour": is_unusual_hour,
"is_weekend": now.weekday() >= 5,
"is_salary_period": 25 <= now.day <= 30,
# Context
"event_time": event["event_time"],
"feature_compute_time": datetime.utcnow().isoformat(),
}
# Update state
self.recent_txns.add(event_json)
if not is_new_recipient:
count = self.known_recipients.get(receiver) or 0
self.known_recipients.put(receiver, count + 1)
else:
self.known_recipients.put(receiver, 1)
# Emit enriched event to fraud scoring topic
yield json.dumps(features)
def _std(self, values: list) -> float:
if len(values) < 2:
return 0.0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / (len(values) - 1)
return variance ** 0.5
def _compute_geo_velocity(self, current: dict, recent: list) -> float:
"""
Compute geographic velocity (km/h) between current and last transaction.
Flags impossible travel (e.g., Lagos to Kano in 5 minutes).
"""
if not recent:
return 0.0
last_txn = recent[-1]
current_geo = current.get("geo", {})
last_geo = last_txn.get("geo", {})
if not all([
current_geo.get("latitude"),
current_geo.get("longitude"),
last_geo.get("latitude"),
last_geo.get("longitude"),
]):
return 0.0
from math import radians, sin, cos, sqrt, atan2
lat1, lon1 = radians(last_geo["latitude"]), radians(last_geo["longitude"])
lat2, lon2 = radians(current_geo["latitude"]), radians(current_geo["longitude"])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance_km = 6371 * c
last_time = datetime.fromisoformat(last_txn["event_time"])
current_time = datetime.fromisoformat(current["event_time"])
hours = (current_time - last_time).total_seconds() / 3600
if hours <= 0:
return float("inf") if distance_km > 1 else 0.0
return round(distance_km / hours, 2)
Rule-Based Fraud Engine
The rule-based engine provides immediate, interpretable fraud detection that complements the ML model. Rules are tuned for Nigerian fraud patterns:
# fraud/rule_engine.py
"""
Rule-based fraud detection engine for Nigerian fintech.
Rules are based on CBN fraud advisories and observed attack patterns.
"""
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class FraudRule:
rule_id: str
name: str
description: str
risk_level: RiskLevel
score: float # 0.0 to 1.0
@dataclass
class RuleResult:
rule: FraudRule
triggered: bool
details: str
class NigerianFraudRuleEngine:
"""
Evaluates Nigerian fintech-specific fraud rules.
Rules are ordered by computational cost (cheapest first).
"""
def evaluate(self, features: dict) -> List[RuleResult]:
results = []
rules = [
self._check_velocity_burst,
self._check_amount_anomaly,
self._check_impossible_travel,
self._check_new_recipient_large_amount,
self._check_salary_day_pattern,
self._check_round_amount_cascade,
self._check_channel_switching,
self._check_unusual_hours,
self._check_sim_swap_indicator,
self._check_smurfing_pattern,
]
for rule_fn in rules:
result = rule_fn(features)
results.append(result)
return results
def _check_velocity_burst(self, f: dict) -> RuleResult:
"""
Detects rapid-fire transactions typical of compromised accounts.
Nigerian pattern: after SIM swap, fraudsters drain accounts within
5-10 minutes via multiple small transfers.
"""
rule = FraudRule(
rule_id="NG-VEL-001",
name="Transaction Velocity Burst",
description="More than 5 transactions in 1 minute",
risk_level=RiskLevel.HIGH,
score=0.85,
)
triggered = f.get("txn_count_1m", 0) > 5
details = f"Transactions in last 1m: {f.get('txn_count_1m', 0)}"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_amount_anomaly(self, f: dict) -> RuleResult:
"""
Flags transactions significantly above account baseline.
Z-score > 3 indicates the amount is more than 3 standard deviations
above the account's recent average.
"""
rule = FraudRule(
rule_id="NG-AMT-001",
name="Amount Anomaly",
description="Transaction amount > 3 std deviations from baseline",
risk_level=RiskLevel.MEDIUM,
score=0.6,
)
triggered = abs(f.get("amount_zscore", 0)) > 3.0
details = f"Amount z-score: {f.get('amount_zscore', 0):.2f}"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_impossible_travel(self, f: dict) -> RuleResult:
"""
Detects transactions from geographically impossible locations.
Example: POS transaction in Lagos, then Kano (1,000km) within
30 minutes would require 2,000 km/h — physically impossible.
"""
rule = FraudRule(
rule_id="NG-GEO-001",
name="Impossible Travel",
description="Geographic velocity exceeds 500 km/h",
risk_level=RiskLevel.CRITICAL,
score=0.95,
)
triggered = f.get("geo_velocity_kmh", 0) > 500
details = f"Geographic velocity: {f.get('geo_velocity_kmh', 0):.0f} km/h"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_new_recipient_large_amount(self, f: dict) -> RuleResult:
"""
First-time transfer to a new recipient with large amount.
Common in social engineering scams targeting Nigerian bank customers.
"""
rule = FraudRule(
rule_id="NG-REC-001",
name="Large Transfer to New Recipient",
description="First transfer to recipient exceeds NGN 500,000",
risk_level=RiskLevel.HIGH,
score=0.7,
)
triggered = (
f.get("is_new_recipient", False)
and f.get("amount", 0) > 500_000
)
details = (
f"New recipient: {f.get('is_new_recipient')}, "
f"amount: NGN {f.get('amount', 0):,.2f}"
)
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_salary_day_pattern(self, f: dict) -> RuleResult:
"""
Elevated risk during salary days (25th-30th) when fraud spikes.
CBN data shows 40% of fraud occurs during salary disbursement periods.
"""
rule = FraudRule(
rule_id="NG-TMP-001",
name="Salary Period High Risk",
description="High-value transaction during salary period with anomalies",
risk_level=RiskLevel.MEDIUM,
score=0.4,
)
triggered = (
f.get("is_salary_period", False)
and f.get("amount", 0) > 200_000
and f.get("txn_count_1h", 0) > 10
)
details = f"Salary period: {f.get('is_salary_period')}, amount: NGN {f.get('amount', 0):,.2f}"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_round_amount_cascade(self, f: dict) -> RuleResult:
"""
Detects multiple round-amount transfers — a pattern common in
money mule operations where stolen funds are split into
round amounts (e.g., NGN 50,000, 100,000, 200,000).
"""
rule = FraudRule(
rule_id="NG-PAT-001",
name="Round Amount Cascade",
description="Multiple round-amount transfers to different recipients",
risk_level=RiskLevel.HIGH,
score=0.75,
)
amount = f.get("amount", 0)
is_round = amount > 0 and amount % 10_000 == 0
triggered = (
is_round
and f.get("unique_recipients_1h", 0) > 3
and f.get("txn_count_1h", 0) > 5
)
details = (
f"Round amount: {is_round}, "
f"unique recipients (1h): {f.get('unique_recipients_1h', 0)}"
)
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_channel_switching(self, f: dict) -> RuleResult:
"""
Rapid switching between payment channels is suspicious.
Legitimate users rarely use 3+ channels within an hour.
"""
rule = FraudRule(
rule_id="NG-CHN-001",
name="Rapid Channel Switching",
description="3+ unique channels used within 1 hour",
risk_level=RiskLevel.MEDIUM,
score=0.5,
)
triggered = f.get("unique_channels_1h", 0) >= 3
details = f"Unique channels (1h): {f.get('unique_channels_1h', 0)}"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_unusual_hours(self, f: dict) -> RuleResult:
"""
Transactions between midnight and 5 AM Nigerian time
are statistically more likely to be fraudulent.
"""
rule = FraudRule(
rule_id="NG-TMP-002",
name="Unusual Hour Transaction",
description="Transaction during 12AM-5AM with other risk factors",
risk_level=RiskLevel.LOW,
score=0.3,
)
triggered = (
f.get("is_unusual_hour", False)
and f.get("amount", 0) > 100_000
)
details = f"Hour: {f.get('hour_of_day')}, unusual: {f.get('is_unusual_hour')}"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_sim_swap_indicator(self, f: dict) -> RuleResult:
"""
SIM swap fraud is the #1 attack vector in Nigeria.
Pattern: new device + large transfer + new recipient within
minutes of a USSD session originating from a new IMEI.
"""
rule = FraudRule(
rule_id="NG-SIM-001",
name="SIM Swap Indicator",
description="New device with immediate large transfers to new recipients",
risk_level=RiskLevel.CRITICAL,
score=0.9,
)
# This requires enrichment from device state store
triggered = (
f.get("is_new_device", False)
and f.get("is_new_recipient", False)
and f.get("amount", 0) > 100_000
and f.get("txn_count_5m", 0) > 2
)
details = "New device with rapid transfers to new recipients"
return RuleResult(rule=rule, triggered=triggered, details=details)
def _check_smurfing_pattern(self, f: dict) -> RuleResult:
"""
Detects structuring / smurfing — splitting large amounts into
smaller transfers to stay below CBN's NGN 5M reporting threshold.
"""
rule = FraudRule(
rule_id="NG-AML-001",
name="Potential Smurfing",
description="Many sub-threshold transfers to multiple recipients",
risk_level=RiskLevel.HIGH,
score=0.8,
)
triggered = (
f.get("txn_count_24h", 0) > 20
and f.get("unique_recipients_1h", 0) > 5
and 1_000_000 < f.get("amount", 0) < 5_000_000
)
details = (
f"24h txn count: {f.get('txn_count_24h', 0)}, "
f"unique recipients: {f.get('unique_recipients_1h', 0)}"
)
return RuleResult(rule=rule, triggered=triggered, details=details)
Fraud Score Fusion
The ensemble combines rule-based scores with ML model predictions:
# fraud/score_fusion.py
"""
Fuses scores from rule engine, ML model, and network analysis
into a final fraud decision.
"""
from dataclasses import dataclass
from typing import List
from enum import Enum
class FraudDecision(Enum):
ALLOW = "allow"
CHALLENGE = "challenge" # Require OTP or step-up authentication
BLOCK = "block" # Block and alert
REVIEW = "review" # Queue for manual review
@dataclass
class FraudVerdict:
decision: FraudDecision
composite_score: float
rule_score: float
ml_score: float
network_score: float
triggered_rules: List[str]
explanation: str
latency_ms: float
class FraudScoreFusion:
"""
Weighted ensemble for combining fraud signals.
Weights are calibrated based on Nigerian fintech fraud patterns:
- Rules are heavily weighted because Nigerian fraud patterns
are well-characterized (SIM swap, smurfing, agent collusion)
- ML captures subtle behavioral anomalies
- Network analysis catches mule rings
"""
# Decision thresholds (tuned for < 0.1% FPR)
BLOCK_THRESHOLD = 0.85
CHALLENGE_THRESHOLD = 0.50
REVIEW_THRESHOLD = 0.35
# Ensemble weights
RULE_WEIGHT = 0.40
ML_WEIGHT = 0.35
NETWORK_WEIGHT = 0.25
def fuse(
self,
rule_results: list,
ml_score: float,
network_score: float,
) -> FraudVerdict:
"""Compute final fraud verdict from all scoring engines."""
import time
start = time.monotonic()
# Compute aggregate rule score
triggered = [r for r in rule_results if r.triggered]
if triggered:
rule_score = max(r.rule.score for r in triggered)
else:
rule_score = 0.0
# Weighted composite
composite = (
self.RULE_WEIGHT * rule_score
+ self.ML_WEIGHT * ml_score
+ self.NETWORK_WEIGHT * network_score
)
# Critical rules override composite score
critical_triggered = [
r for r in triggered
if r.rule.risk_level.value == "critical"
]
if critical_triggered:
composite = max(composite, 0.90)
# Determine decision
if composite >= self.BLOCK_THRESHOLD:
decision = FraudDecision.BLOCK
elif composite >= self.CHALLENGE_THRESHOLD:
decision = FraudDecision.CHALLENGE
elif composite >= self.REVIEW_THRESHOLD:
decision = FraudDecision.REVIEW
else:
decision = FraudDecision.ALLOW
latency = (time.monotonic() - start) * 1000
return FraudVerdict(
decision=decision,
composite_score=round(composite, 4),
rule_score=round(rule_score, 4),
ml_score=round(ml_score, 4),
network_score=round(network_score, 4),
triggered_rules=[r.rule.rule_id for r in triggered],
explanation=self._generate_explanation(triggered, ml_score, composite),
latency_ms=round(latency, 2),
)
def _generate_explanation(
self, triggered: list, ml_score: float, composite: float
) -> str:
"""Generate human-readable explanation for fraud analysts."""
parts = []
if triggered:
rule_names = [r.rule.name for r in triggered]
parts.append(f"Triggered rules: {', '.join(rule_names)}")
if ml_score > 0.5:
parts.append(f"ML model flagged anomaly (score: {ml_score:.2f})")
parts.append(f"Composite score: {composite:.2f}")
return " | ".join(parts)
Performance Benchmarks
Our reference architecture achieves the following on a 3-node Flink cluster (8 vCPUs, 32GB RAM each) processing Nigerian fintech traffic:
| Metric | Target | Achieved |
|---|---|---|
| End-to-end latency (p50) | < 100ms | 78ms |
| End-to-end latency (p99) | < 200ms | 187ms |
| Throughput | 5,000 events/sec | 8,200 events/sec |
| False positive rate | < 0.1% | 0.07% |
| Detection rate (known patterns) | > 95% | 97.3% |
| Detection rate (novel patterns) | > 70% | 74.1% |
| State store size (per account) | < 10 KB | ~6.2 KB |
| Recovery time (checkpoint restore) | < 30s | 22s |
Operational note: During salary disbursement periods (25th-30th), throughput spikes 3-5x. The Flink cluster auto-scales using Kubernetes HPA with custom metrics based on Kafka consumer lag. Budget 2x normal capacity for these periods.
CBN Regulatory Reporting and NDPR Compliance
Regulatory Landscape
Nigerian fintech companies operate under multiple regulatory frameworks:
| Regulation | Authority | Key Requirements |
|---|---|---|
| CBN Guidelines | Central Bank of Nigeria | Transaction reporting, AML/CFT, capital adequacy |
| NDPR | NITDA | Data protection, consent management, breach notification |
| NIBSS Rules | Nigeria Inter-Bank Settlement System | Settlement reporting, dispute resolution |
| FIRS Requirements | Federal Inland Revenue Service | VAT computation, withholding tax |
| CBN AML/CFT | CBN / NFIU | Suspicious transaction reports (STRs), CTRs |
Automated CBN Reporting Pipeline
The CBN requires daily, weekly, and monthly reports covering transaction volumes, fraud statistics, and capital adequacy metrics. Automating these reports eliminates manual errors and ensures timely submission:
# reporting/cbn_reporting_pipeline.py
"""
Automated CBN regulatory reporting pipeline.
Generates required reports from the analytics warehouse.
"""
from dataclasses import dataclass
from datetime import date, datetime
from decimal import Decimal
from typing import List
@dataclass
class CBNDailyTransactionReport:
"""
CBN requires daily submission of transaction volumes by channel.
Reference: CBN/DIR/GEN/CIR/07/008
"""
report_date: date
reporting_entity_code: str # CBN-assigned institution code
total_transactions: int
total_value_ngn: Decimal
channel_breakdown: dict # {channel: {count, value}}
failed_transactions: int
failed_value_ngn: Decimal
average_transaction_value: Decimal
peak_hour: int
peak_hour_volume: int
@dataclass
class SuspiciousTransactionReport:
"""
STR format per CBN AML/CFT Regulation 2022.
Must be filed within 24 hours of detection.
Threshold: NGN 5,000,000 for individuals, NGN 10,000,000 for corporates.
"""
str_reference: str
detection_date: datetime
subject_name: str
subject_bvn_hash: str # Never include raw BVN
subject_account: str
transaction_refs: List[str]
total_amount_ngn: Decimal
suspicious_indicators: List[str]
risk_level: str
narrative: str
reporting_officer: str
-- reporting/cbn_daily_report.sql
-- Daily transaction report for CBN submission
-- Runs at 01:00 WAT, covers previous calendar day
WITH daily_transactions AS (
SELECT
event_date,
channel,
status,
COUNT(*) AS transaction_count,
SUM(amount) AS total_value,
AVG(amount) AS avg_value,
MAX(amount) AS max_value,
COUNT(DISTINCT sender_account) AS unique_senders,
COUNT(DISTINCT receiver_account) AS unique_receivers,
EXTRACT(HOUR FROM event_time) AS transaction_hour
FROM analytics.gold.fct_transactions
WHERE event_date = CURRENT_DATE - INTERVAL '1 day'
AND currency = 'NGN'
GROUP BY event_date, channel, status, EXTRACT(HOUR FROM event_time)
),
channel_summary AS (
SELECT
channel,
SUM(CASE WHEN status = 'completed' THEN transaction_count ELSE 0 END)
AS successful_count,
SUM(CASE WHEN status = 'completed' THEN total_value ELSE 0 END)
AS successful_value,
SUM(CASE WHEN status = 'failed' THEN transaction_count ELSE 0 END)
AS failed_count,
SUM(CASE WHEN status = 'failed' THEN total_value ELSE 0 END)
AS failed_value,
ROUND(
SUM(CASE WHEN status = 'failed' THEN transaction_count ELSE 0 END)::DECIMAL
/ NULLIF(SUM(transaction_count), 0) * 100,
2
) AS failure_rate_pct
FROM daily_transactions
GROUP BY channel
),
peak_hours AS (
SELECT
transaction_hour,
SUM(transaction_count) AS hourly_volume,
ROW_NUMBER() OVER (ORDER BY SUM(transaction_count) DESC) AS rn
FROM daily_transactions
GROUP BY transaction_hour
),
-- CBN threshold monitoring: transactions >= NGN 5M
large_transactions AS (
SELECT
COUNT(*) AS large_txn_count,
SUM(amount) AS large_txn_value
FROM analytics.gold.fct_transactions
WHERE event_date = CURRENT_DATE - INTERVAL '1 day'
AND amount >= 5000000
AND currency = 'NGN'
AND status = 'completed'
),
-- Fraud statistics for CBN fraud desk
fraud_summary AS (
SELECT
COUNT(*) AS flagged_count,
SUM(amount) AS flagged_value,
COUNT(CASE WHEN fraud_decision = 'block' THEN 1 END) AS blocked_count,
SUM(CASE WHEN fraud_decision = 'block' THEN amount ELSE 0 END)
AS blocked_value,
COUNT(CASE WHEN fraud_decision = 'challenge' THEN 1 END)
AS challenged_count,
ROUND(
COUNT(CASE WHEN fraud_decision = 'block' THEN 1 END)::DECIMAL
/ NULLIF(COUNT(*), 0) * 100,
4
) AS block_rate_pct
FROM analytics.gold.fct_fraud_verdicts
WHERE verdict_date = CURRENT_DATE - INTERVAL '1 day'
)
SELECT
CURRENT_DATE - INTERVAL '1 day' AS report_date,
'CBN/FI/XXXXX' AS reporting_entity_code,
-- Volume summary
cs.channel,
cs.successful_count,
cs.successful_value,
cs.failed_count,
cs.failed_value,
cs.failure_rate_pct,
-- Peak hour
ph.transaction_hour AS peak_hour,
ph.hourly_volume AS peak_hour_volume,
-- Large transactions (CTR threshold)
lt.large_txn_count,
lt.large_txn_value,
-- Fraud summary
fs.flagged_count AS fraud_flagged,
fs.blocked_count AS fraud_blocked,
fs.blocked_value AS fraud_prevented_value,
fs.block_rate_pct AS fraud_block_rate
FROM channel_summary cs
CROSS JOIN peak_hours ph
CROSS JOIN large_transactions lt
CROSS JOIN fraud_summary fs
WHERE ph.rn = 1
ORDER BY cs.channel;
NDPR Compliance in Data Pipelines
The Nigeria Data Protection Regulation (NDPR), enforced by NITDA, imposes requirements analogous to GDPR. Here's how to build compliance into data pipelines:
┌──────────────────────────────────────────────────────────────────┐
│ NDPR COMPLIANCE ARCHITECTURE │
├──────────────────────────────────────────────────────────────────┤
│ │
│ DATA CLASSIFICATION │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ TIER 1 (Critical PII) TIER 2 (Sensitive) TIER 3 │ │
│ │ • BVN • Transaction history • Aggregates│ │
│ │ • NIN • Account balances • Anonymized│ │
│ │ • Phone numbers • IP addresses • Statistics│ │
│ │ • Account numbers • Device IDs │ │
│ │ • Biometric data • Location data │ │
│ │ │ │
│ │ → Encrypted at rest → Encrypted at rest → Standard │ │
│ │ → Tokenized in transit → Masked in analytics │ │ │
│ │ → 72hr breach notify → Access logging │ │ │
│ │ → Data subject access → Retention limits │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ DATA FLOW WITH NDPR CONTROLS │
│ ┌────────┐ ┌───────────┐ ┌───────────┐ ┌──────────┐ │
│ │ Ingest │───►│ Tokenize │───►│ Classify │───►│ Route by │ │
│ │ │ │ PII │ │ & Tag │ │ Tier │ │
│ └────────┘ └───────────┘ └───────────┘ └──────────┘ │
│ │ │
│ ┌──────────────┼─────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌────────┐ ┌──┐ │
│ │Encrypted│ │ Masked │ │An│ │
│ │Vault │ │ Lakehse│ │ly│ │
│ └─────────┘ └────────┘ └──┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
# compliance/ndpr_data_handler.py
"""
NDPR-compliant data handling utilities.
Implements tokenization, masking, and consent tracking
as required by Nigeria Data Protection Regulation.
"""
import hashlib
import re
from typing import Optional
from cryptography.fernet import Fernet
class NDPRDataHandler:
"""
Handles PII in compliance with NDPR requirements:
1. Tokenize sensitive identifiers (BVN, NIN, phone numbers)
2. Mask data for analytics workloads
3. Track consent and data subject access requests
4. Enforce retention periods
"""
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
def tokenize_bvn(self, bvn: str) -> str:
"""
Tokenize Bank Verification Number (11-digit BVN).
BVN is critical PII under NDPR — never store in plain text
in analytics systems.
"""
if not re.match(r"^\d{11}$", bvn):
raise ValueError("Invalid BVN format")
return self.cipher.encrypt(bvn.encode()).decode()
def hash_bvn(self, bvn: str, purpose: str = "analytics") -> str:
"""
One-way hash for analytics (non-reversible).
Uses purpose-specific salt to prevent cross-system correlation.
"""
salt = f"ndpr_{purpose}_2024".encode()
return hashlib.sha256(bvn.encode() + salt).hexdigest()[:16]
def mask_account_number(self, account: str) -> str:
"""Mask account number for analytics: 0123456789 → 01******89"""
if len(account) < 4:
return "****"
return account[:2] + "*" * (len(account) - 4) + account[-2:]
def mask_phone_number(self, phone: str) -> str:
"""Mask phone number: 08012345678 → 080****5678"""
cleaned = re.sub(r"[^0-9]", "", phone)
if len(cleaned) < 7:
return "****"
return cleaned[:3] + "****" + cleaned[-4:]
def redact_narration(self, narration: str) -> str:
"""
Remove PII from transaction narrations.
Nigerian bank transfers often contain names and account numbers
in the narration field.
"""
# Remove 10-digit account numbers
narration = re.sub(r"\b\d{10}\b", "[REDACTED_ACCT]", narration)
# Remove 11-digit BVNs
narration = re.sub(r"\b\d{11}\b", "[REDACTED_BVN]", narration)
# Remove phone numbers (Nigerian format)
narration = re.sub(
r"\b0[789][01]\d{8}\b", "[REDACTED_PHONE]", narration
)
return narration
def apply_retention_policy(
self, data_tier: str, record_date: str
) -> dict:
"""
NDPR retention policy enforcement.
Returns retention metadata for data lifecycle management.
"""
retention_days = {
"tier1_critical_pii": 365 * 2, # 2 years then purge
"tier2_sensitive": 365 * 5, # 5 years (CBN requirement)
"tier3_analytics": 365 * 7, # 7 years (CBN audit trail)
"tier3_aggregated": -1, # Indefinite (no PII)
}
return {
"data_tier": data_tier,
"retention_days": retention_days.get(data_tier, 365 * 2),
"classification_date": record_date,
"requires_consent": data_tier in ("tier1_critical_pii", "tier2_sensitive"),
"breach_notification_hours": 72 if data_tier == "tier1_critical_pii" else 168,
}
Data Lineage and Audit Trail
CBN auditors require complete data lineage — the ability to trace any reported number back to the source transactions:
-- lineage/audit_trail.sql
-- Create audit trail table for CBN compliance
-- Every transformation step is logged with input/output lineage
CREATE TABLE IF NOT EXISTS analytics.audit.transformation_lineage (
lineage_id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
pipeline_run_id UUID NOT NULL,
pipeline_name VARCHAR(255) NOT NULL,
step_name VARCHAR(255) NOT NULL,
step_order INT NOT NULL,
-- Input lineage
input_source VARCHAR(255) NOT NULL,
input_row_count BIGINT,
input_checksum VARCHAR(64), -- SHA-256 of input dataset
input_schema_hash VARCHAR(64),
-- Output lineage
output_target VARCHAR(255) NOT NULL,
output_row_count BIGINT,
output_checksum VARCHAR(64),
output_schema_hash VARCHAR(64),
-- Transformation metadata
transformation_type VARCHAR(50), -- 'filter', 'aggregate', 'join', 'enrich'
transformation_sql TEXT, -- Actual SQL/code executed
rows_added BIGINT DEFAULT 0,
rows_removed BIGINT DEFAULT 0,
rows_modified BIGINT DEFAULT 0,
-- Timing
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
completed_at TIMESTAMP WITH TIME ZONE,
duration_ms BIGINT,
-- Context
triggered_by VARCHAR(255), -- 'schedule', 'manual', 'backfill'
environment VARCHAR(50), -- 'production', 'staging'
dbt_model_name VARCHAR(255),
git_commit_sha VARCHAR(40),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Index for CBN audit queries
CREATE INDEX idx_lineage_pipeline_date
ON analytics.audit.transformation_lineage (pipeline_name, started_at);
CREATE INDEX idx_lineage_output_target
ON analytics.audit.transformation_lineage (output_target);
-- Materialized view: daily lineage summary for CBN reporting
CREATE MATERIALIZED VIEW analytics.audit.daily_lineage_summary AS
SELECT
DATE_TRUNC('day', started_at) AS report_date,
pipeline_name,
COUNT(DISTINCT pipeline_run_id) AS pipeline_runs,
SUM(output_row_count) AS total_rows_processed,
AVG(duration_ms) AS avg_duration_ms,
MAX(duration_ms) AS max_duration_ms,
COUNT(CASE WHEN completed_at IS NULL THEN 1 END) AS failed_steps,
MAX(git_commit_sha) AS latest_code_version
FROM analytics.audit.transformation_lineage
WHERE environment = 'production'
GROUP BY DATE_TRUNC('day', started_at), pipeline_name;
Alternative Credit Scoring for Underbanked Populations
Nigeria has over 40 million adults without access to formal credit, not because they lack creditworthiness, but because traditional credit bureaus have no data on them. Alternative credit scoring using transaction data, airtime purchase patterns, and merchant interactions can unlock financial inclusion at scale.
Alternative Data Sources
┌──────────────────────────────────────────────────────────────────┐
│ ALTERNATIVE CREDIT SCORING DATA SOURCES │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TRANSACTION │ │ AIRTIME & │ │ MERCHANT │ │
│ │ HISTORY │ │ TELCO DATA │ │ DATA │ │
│ │ │ │ │ │ │ │
│ │ • Inflow/outflow│ │ • Airtime freq │ │ • Payment │ │
│ │ • Consistency │ │ • Data bundles │ │ regularity │ │
│ │ • Balance trends│ │ • Bill payments│ │ • Supplier │ │
│ │ • Salary pattern│ │ • Top-up amount│ │ relationships│ │
│ │ • Savings ratio │ │ • Network type │ │ • Inventory │ │
│ │ │ │ │ │ cycles │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └────────────┬───────┘─────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ FEATURE ENGINEERING PIPELINE │ │
│ │ │ │
│ │ 250+ features computed across 30/60/90/180-day windows │ │
│ └──────────────────────────┬────────────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ML SCORING MODEL (XGBoost) │ │
│ │ │ │
│ │ Output: Credit score (300-850), Risk band (A-E), │ │
│ │ Recommended limit (NGN), Default probability (%) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
Feature Engineering for Credit Scoring
# credit/feature_engineering.py
"""
Feature engineering for alternative credit scoring.
Computes 250+ features from transaction history, airtime patterns,
and merchant data for Nigerian underbanked populations.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List
class CreditFeatureEngineer:
"""
Computes credit scoring features from alternative data.
Feature categories:
1. Income stability (salary detection, income consistency)
2. Spending behavior (necessity vs. discretionary, regularity)
3. Savings capacity (balance trends, savings ratio)
4. Financial stress indicators (overdrafts, emergency patterns)
5. Network effects (merchant diversity, social transfers)
6. Digital engagement (airtime, data bundle, bill payments)
"""
WINDOWS = [30, 60, 90, 180] # Days
def compute_features(
self, account_id: str, transactions: pd.DataFrame
) -> Dict[str, float]:
"""Compute all credit scoring features for an account."""
features = {}
for window in self.WINDOWS:
cutoff = datetime.utcnow() - timedelta(days=window)
window_txns = transactions[
transactions["event_time"] >= cutoff
]
prefix = f"w{window}"
# Income features
features.update(
self._income_features(window_txns, prefix)
)
# Spending features
features.update(
self._spending_features(window_txns, prefix)
)
# Balance features
features.update(
self._balance_features(window_txns, prefix)
)
# Stress indicators
features.update(
self._stress_features(window_txns, prefix)
)
# Cross-window features (trend analysis)
features.update(
self._trend_features(transactions)
)
# Digital engagement features
features.update(
self._digital_engagement_features(transactions)
)
return features
def _income_features(
self, txns: pd.DataFrame, prefix: str
) -> Dict[str, float]:
"""
Detect and characterize income patterns.
Key insight: many Nigerian workers receive irregular income
(daily market sales, weekly wages, gig economy payments)
rather than monthly salaries.
"""
inflows = txns[txns["amount"] > 0]
if inflows.empty:
return {
f"{prefix}_income_total": 0,
f"{prefix}_income_count": 0,
f"{prefix}_income_regularity": 0,
f"{prefix}_salary_detected": 0,
f"{prefix}_income_diversity": 0,
}
# Detect salary-like patterns (regular, similar amounts, monthly)
monthly_groups = inflows.groupby(
inflows["event_time"].dt.to_period("M")
)["amount"]
monthly_totals = monthly_groups.sum()
salary_detected = 0
if len(monthly_totals) >= 2:
cv = monthly_totals.std() / monthly_totals.mean()
if cv < 0.3: # Coefficient of variation < 30%
salary_detected = 1
# Income regularity: days between income events
if len(inflows) >= 2:
income_dates = inflows["event_time"].sort_values()
gaps = income_dates.diff().dt.days.dropna()
regularity = 1.0 / (1.0 + gaps.std()) if len(gaps) > 0 else 0
else:
regularity = 0
return {
f"{prefix}_income_total": float(inflows["amount"].sum()),
f"{prefix}_income_count": len(inflows),
f"{prefix}_income_mean": float(inflows["amount"].mean()),
f"{prefix}_income_regularity": float(regularity),
f"{prefix}_salary_detected": salary_detected,
f"{prefix}_income_diversity": inflows["sender_account"].nunique(),
f"{prefix}_income_cv": float(
inflows["amount"].std() / inflows["amount"].mean()
if inflows["amount"].mean() > 0 else 0
),
}
def _spending_features(
self, txns: pd.DataFrame, prefix: str
) -> Dict[str, float]:
"""
Characterize spending behavior.
Distinguishes necessity spending (airtime, bills, food)
from discretionary (entertainment, fashion).
"""
outflows = txns[txns["amount"] < 0].copy()
outflows["amount"] = outflows["amount"].abs()
if outflows.empty:
return {
f"{prefix}_spend_total": 0,
f"{prefix}_spend_necessity_ratio": 0,
f"{prefix}_merchant_diversity": 0,
}
# Categorize spending
necessity_categories = {"airtime", "bill", "utility", "transport", "food"}
necessity_spend = outflows[
outflows["category"].isin(necessity_categories)
]["amount"].sum()
total_spend = outflows["amount"].sum()
return {
f"{prefix}_spend_total": float(total_spend),
f"{prefix}_spend_count": len(outflows),
f"{prefix}_spend_mean": float(outflows["amount"].mean()),
f"{prefix}_spend_necessity_ratio": float(
necessity_spend / total_spend if total_spend > 0 else 0
),
f"{prefix}_merchant_diversity": outflows["merchant_id"].nunique(),
f"{prefix}_spend_consistency": float(
1.0 / (1.0 + outflows.groupby(
outflows["event_time"].dt.to_period("W")
)["amount"].sum().std())
),
}
def _balance_features(
self, txns: pd.DataFrame, prefix: str
) -> Dict[str, float]:
"""
Analyze balance trends as a proxy for financial health.
A consistently positive balance trend indicates savings capacity.
"""
if txns.empty:
return {
f"{prefix}_balance_trend": 0,
f"{prefix}_min_balance": 0,
f"{prefix}_savings_ratio": 0,
}
# Reconstruct running balance
sorted_txns = txns.sort_values("event_time")
sorted_txns["running_balance"] = sorted_txns["amount"].cumsum()
# Balance trend (slope of linear regression)
x = np.arange(len(sorted_txns))
y = sorted_txns["running_balance"].values
if len(x) > 1:
slope = np.polyfit(x, y, 1)[0]
else:
slope = 0
# Savings ratio: (income - spending) / income
income = txns[txns["amount"] > 0]["amount"].sum()
spending = txns[txns["amount"] < 0]["amount"].abs().sum()
savings_ratio = (income - spending) / income if income > 0 else 0
return {
f"{prefix}_balance_trend": float(slope),
f"{prefix}_min_balance": float(sorted_txns["running_balance"].min()),
f"{prefix}_max_balance": float(sorted_txns["running_balance"].max()),
f"{prefix}_avg_balance": float(sorted_txns["running_balance"].mean()),
f"{prefix}_savings_ratio": float(max(0, savings_ratio)),
f"{prefix}_days_negative_balance": int(
(sorted_txns["running_balance"] < 0).sum()
),
}
def _stress_features(
self, txns: pd.DataFrame, prefix: str
) -> Dict[str, float]:
"""
Detect financial stress indicators.
Nigerian-specific patterns:
- Multiple small airtime purchases (NGN 50-100) indicate cash constraints
- Late-night transactions may indicate emergency borrowing
- Rapid depletion after income receipt signals living paycheck-to-paycheck
"""
if txns.empty:
return {
f"{prefix}_stress_score": 0,
f"{prefix}_micro_airtime_count": 0,
}
# Micro airtime purchases (financial stress indicator)
micro_airtime = txns[
(txns["category"] == "airtime")
& (txns["amount"].abs() < 200)
]
# Emergency late-night transactions (11PM - 4AM)
late_night = txns[
txns["event_time"].dt.hour.isin([23, 0, 1, 2, 3, 4])
]
# Rapid depletion: > 80% of income spent within 3 days
# (This is a simplified version; production would track per pay cycle)
return {
f"{prefix}_micro_airtime_count": len(micro_airtime),
f"{prefix}_late_night_txn_count": len(late_night),
f"{prefix}_late_night_txn_ratio": float(
len(late_night) / len(txns) if len(txns) > 0 else 0
),
f"{prefix}_stress_score": float(
min(1.0, (len(micro_airtime) * 0.1 + len(late_night) * 0.05))
),
}
def _trend_features(self, txns: pd.DataFrame) -> Dict[str, float]:
"""Cross-window trend analysis."""
features = {}
for metric in ["income_total", "spend_total", "savings_ratio"]:
values = []
for window in self.WINDOWS:
cutoff = datetime.utcnow() - timedelta(days=window)
window_txns = txns[txns["event_time"] >= cutoff]
if metric == "income_total":
values.append(
float(window_txns[window_txns["amount"] > 0]["amount"].sum())
)
elif metric == "spend_total":
values.append(
float(window_txns[window_txns["amount"] < 0]["amount"].abs().sum())
)
elif metric == "savings_ratio":
income = window_txns[window_txns["amount"] > 0]["amount"].sum()
spend = window_txns[window_txns["amount"] < 0]["amount"].abs().sum()
values.append(
float((income - spend) / income if income > 0 else 0)
)
if len(values) >= 2:
# Trend: positive means improving
trend = np.polyfit(range(len(values)), values, 1)[0]
features[f"trend_{metric}"] = float(trend)
return features
def _digital_engagement_features(
self, txns: pd.DataFrame
) -> Dict[str, float]:
"""
Digital engagement as a creditworthiness signal.
Research shows that consistent digital financial behavior
correlates with lower default rates in African markets.
"""
if txns.empty:
return {
"digital_engagement_score": 0,
"channel_diversity": 0,
}
return {
"digital_engagement_score": float(min(1.0,
txns["channel"].nunique() * 0.2
+ (1 if "mobile_wallet" in txns["channel"].values else 0) * 0.3
+ (1 if "ussd" in txns["channel"].values else 0) * 0.1
)),
"channel_diversity": txns["channel"].nunique(),
"has_recurring_payments": int(
txns.groupby("receiver_account").size().max() > 2
if len(txns) > 0 else 0
),
"bill_payment_regularity": float(
txns[txns["category"] == "bill"].groupby(
txns["event_time"].dt.to_period("M")
).size().std()
if "bill" in txns["category"].values else 0
),
}
Credit Score Model Output
-- credit/credit_score_output.sql
-- Final credit score mart consumed by lending products
CREATE TABLE analytics.gold.dim_credit_scores AS
WITH latest_scores AS (
SELECT
account_id,
credit_score, -- 300-850 range
risk_band, -- A (lowest risk) to E (highest)
default_probability, -- 0.0 to 1.0
recommended_limit_ngn,
model_version,
feature_count,
scoring_timestamp,
ROW_NUMBER() OVER (
PARTITION BY account_id
ORDER BY scoring_timestamp DESC
) AS rn
FROM analytics.silver.credit_model_outputs
WHERE scoring_timestamp >= CURRENT_DATE - INTERVAL '7 days'
)
SELECT
ls.account_id,
ls.credit_score,
ls.risk_band,
ls.default_probability,
ls.recommended_limit_ngn,
-- Score interpretation for product teams
CASE
WHEN ls.risk_band = 'A' THEN 'Prime - eligible for all products'
WHEN ls.risk_band = 'B' THEN 'Near-prime - standard terms'
WHEN ls.risk_band = 'C' THEN 'Subprime - reduced limits, higher rates'
WHEN ls.risk_band = 'D' THEN 'High risk - micro-loans only'
WHEN ls.risk_band = 'E' THEN 'Very high risk - decline or secured only'
END AS risk_description,
-- NDPR compliance: never expose raw features in lending decisions
-- Only expose score, band, and recommended limit
ls.model_version,
ls.scoring_timestamp,
-- Monitoring metrics
ls.feature_count,
CASE WHEN ls.feature_count < 50 THEN TRUE ELSE FALSE END AS thin_file_flag
FROM latest_scores ls
WHERE ls.rn = 1;
-- Performance monitoring: score distribution by risk band
-- Used for model monitoring and CBN reporting
CREATE VIEW analytics.gold.credit_score_distribution AS
SELECT
risk_band,
COUNT(*) AS account_count,
ROUND(AVG(credit_score), 1) AS avg_score,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY credit_score), 1) AS median_score,
ROUND(AVG(default_probability) * 100, 2) AS avg_default_pct,
ROUND(AVG(recommended_limit_ngn), 0) AS avg_limit_ngn,
COUNT(CASE WHEN thin_file_flag THEN 1 END) AS thin_file_count
FROM analytics.gold.dim_credit_scores
GROUP BY risk_band
ORDER BY risk_band;
Cost-Optimized Architecture for High-Growth Startups
Nigerian fintech startups face a unique constraint: they need enterprise-grade data infrastructure but operate with Series A/B budgets. Here's how to build a production-ready platform without breaking the bank.
Architecture Overview
┌──────────────────────────────────────────────────────────────────────┐
│ COST-OPTIMIZED FINTECH DATA PLATFORM │
│ (Target: < $15,000/month at 3,300 events/sec) │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ STREAMING LAYER BATCH LAYER │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Apache Kafka (MSK) │ │ Apache Spark on EMR │ │
│ │ 3 brokers, m5.large │ │ Serverless │ │
│ │ ~$1,200/mo │ │ ~$800/mo │ │
│ └──────────┬───────────┘ └──────────┬───────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Apache Flink on K8s │ │ dbt Core (self-host) │ │
│ │ 3 TaskManagers │ │ Free │ │
│ │ ~$600/mo │ │ ~$0/mo │ │
│ └──────────┬───────────┘ └──────────┬───────────┘ │
│ │ │ │
│ └───────────────┬───────────────────┘ │
│ ▼ │
│ STORAGE LAYER │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ S3 Data Lake (raw + processed) │ │
│ │ ~150 TB/year, S3 Standard + Intelligent Tiering │ │
│ │ ~$2,500/mo │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ QUERY LAYER │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ ClickHouse (self-managed on 3x m5.2xlarge) │ │
│ │ Analytics queries, real-time dashboards │ │
│ │ ~$1,800/mo │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ORCHESTRATION + MONITORING │
│ ┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Apache Airflow │ │ Prometheus + │ │ Grafana │ │
│ │ (MWAA) │ │ VictoriaMetrics │ │ Cloud │ │
│ │ ~$400/mo │ │ ~$200/mo │ │ ~$50/mo │ │
│ └──────────────────┘ └─────────────────┘ └──────────────────┘ │
│ │
│ ─────────────────────────────────────────────────────────────────── │
│ MONTHLY COST BREAKDOWN: │
│ │
│ Kafka (MSK) $1,200 Airflow (MWAA) $400 │
│ Flink (EKS) $600 Monitoring $250 │
│ Spark (EMR Serverless) $800 Networking/Transfer $500 │
│ S3 Storage $2,500 Miscellaneous $300 │
│ ClickHouse (EC2) $1,800 │
│ ─────────────────────────────────────────────────────────────────── │
│ TOTAL: ~$8,350/month │
│ │
│ With 30% buffer for peaks: ~$10,900/month │
│ Comparable managed stack (Confluent + Snowflake + Fivetran): │
│ ~$45,000/month — 4x more expensive │
│ │
└──────────────────────────────────────────────────────────────────────┘
Key Cost Optimization Strategies
1. Use Kafka on MSK instead of Confluent Cloud
Amazon MSK provides managed Kafka at roughly 40% of Confluent Cloud's cost for equivalent throughput. For a Nigerian fintech processing 3,300 events/second:
Confluent Cloud (Basic):
- Ingress: 3,300 events/s × 1.8 KB × 3600 × 24 = ~500 GB/day
- Cost: ~$3,500/month (CKUs + storage + networking)
Amazon MSK:
- 3× m5.large brokers
- Cost: ~$1,200/month (instances + storage)
Savings: ~$2,300/month (~66% reduction)
2. ClickHouse instead of Snowflake for analytics
ClickHouse excels at the analytical query patterns fintech data requires (time-series aggregations, funnel analysis, real-time dashboards) at a fraction of Snowflake's cost:
Snowflake (Standard):
- Medium warehouse, ~8 hrs/day usage
- Cost: ~$8,000/month (compute + storage)
ClickHouse (self-managed on EC2):
- 3× m5.2xlarge instances (8 vCPU, 32GB each)
- Cost: ~$1,800/month
Savings: ~$6,200/month (~78% reduction)
Trade-off: Requires operational expertise for cluster management
3. Tiered storage with S3 Intelligent-Tiering
# infrastructure/s3_lifecycle_policy.py
"""
S3 lifecycle policy for cost-optimized data lake storage.
Automatically tiers data based on access patterns.
"""
LIFECYCLE_POLICY = {
"Rules": [
{
"ID": "raw-data-tiering",
"Filter": {"Prefix": "raw/"},
"Status": "Enabled",
"Transitions": [
{
# Move to Infrequent Access after 30 days
"Days": 30,
"StorageClass": "STANDARD_IA",
},
{
# Move to Glacier after 90 days
"Days": 90,
"StorageClass": "GLACIER",
},
{
# Deep Archive after 1 year (CBN 7-year retention)
"Days": 365,
"StorageClass": "DEEP_ARCHIVE",
},
],
},
{
"ID": "processed-data-tiering",
"Filter": {"Prefix": "processed/"},
"Status": "Enabled",
"Transitions": [
{
"Days": 60,
"StorageClass": "STANDARD_IA",
},
{
"Days": 180,
"StorageClass": "GLACIER",
},
],
},
{
"ID": "temp-data-cleanup",
"Filter": {"Prefix": "tmp/"},
"Status": "Enabled",
"Expiration": {"Days": 7},
},
],
}
# Cost projection:
# 150 TB/year raw data
# Year 1: ~$3,450/mo (mostly Standard)
# Year 2: ~$2,100/mo (mix of Standard + IA + Glacier)
# Year 3: ~$1,400/mo (mostly Glacier + Deep Archive)
# 7-year average: ~$1,800/mo for full CBN-compliant retention
4. Spot instances for batch processing
# infrastructure/emr_serverless_config.yml
# EMR Serverless with spot instances for batch ETL
emr_serverless:
application:
name: "fintech-batch-etl"
release_label: "emr-6.15.0"
type: "SPARK"
architecture: "ARM64" # Graviton — 20% cheaper than x86
initial_capacity:
- worker_count: 2
worker_configuration:
cpu: "4 vCPU"
memory: "16 GB"
disk: "100 GB"
maximum_capacity:
cpu: "200 vCPU" # Scale up for month-end batch
memory: "800 GB"
disk: "2000 GB"
auto_stop_configuration:
enabled: true
idle_timeout_minutes: 10 # Aggressive auto-stop for cost control
# Network config for VPC access to Kafka/ClickHouse
network_configuration:
subnet_ids:
- "subnet-private-1a"
- "subnet-private-1b"
security_group_ids:
- "sg-emr-serverless"
Scaling Roadmap
| Stage | Events/sec | Monthly Cost | Key Changes |
|---|---|---|---|
| Seed (MVP) | < 100 | ~$2,000 | Single-node Kafka, PostgreSQL, basic Python scripts |
| Series A | 100 - 1,000 | ~$5,000 | MSK 3-node, Flink on EKS, S3 data lake |
| Series B | 1,000 - 5,000 | ~$11,000 | ClickHouse cluster, advanced fraud ML, dbt |
| Series C | 5,000 - 50,000 | ~$35,000 | Multi-region, dedicated Flink cluster, ML platform |
| Scale | 50,000+ | ~$100,000+ | Confluent/Databricks, dedicated data team |
Practical advice: Resist the urge to over-engineer at the Seed stage. A PostgreSQL database with well-designed tables and a Python cron job can handle 100 events/second reliably. Migrate to Kafka and Flink only when you consistently exceed 500 events/second or need sub-second fraud detection latency.
Putting It All Together: Reference Architecture
The complete reference architecture ties together all the components covered in this guide:
┌────────────────────────────────────────────────────────────────────────────┐
│ NIGERIAN FINTECH DATA PLATFORM — REFERENCE ARCHITECTURE │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ PAYMENT CHANNELS INGESTION STREAM PROCESSING │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────────┐ ┌──────────────┐ │
│ │ NIBSS│ │ USSD │ │ POS │ │ Channel │ ┌────►│ Fraud Engine │ │
│ │ NIP │ │ │ │ │──│ Adapters│───►│ │ (Flink) │ │
│ └──────┘ └──────┘ └──────┘ │ │ Kafka └──────┬───────┘ │
│ ┌──────┐ ┌──────┐ └──────────┘ │ │ │
│ │Wallet│ │ Card │ │ ┌──────▼───────┐ │
│ │ │ │ │ ├────►│ CBN Reporter │ │
│ └──────┘ └──────┘ │ └──────────────┘ │
│ │ │
│ │ ┌──────────────┐ │
│ └────►│ Event Sink │ │
│ │ (S3 + CH) │ │
│ └──────┬───────┘ │
│ │ │
│ BATCH PROCESSING │ │
│ ┌──────────────────────────────────────────────────────────┘ │
│ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ └─►│ Spark │───►│ dbt Core │───►│ClickHouse│───►│ Dashboards │ │
│ │ (EMR) │ │ │ │ (OLAP) │ │ (Grafana + │ │
│ │ │ │ Bronze → │ │ │ │ Metabase) │ │
│ │ Raw → │ │ Silver → │ │ Fraud KPI│ │ │ │
│ │ Parquet │ │ Gold │ │ Credit │ │ CBN Reports │ │
│ └──────────┘ └──────────┘ │ Ops │ │ Credit Scores│ │
│ └──────────┘ └──────────────┘ │
│ │
│ CROSS-CUTTING CONCERNS │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────────┐ │
│ │ NDPR │ │ Audit │ │ Monitoring │ │ Alerting │ │
│ │ Compliance │ │ Lineage │ │ Prometheus │ │ PagerDuty / │ │
│ │ (Tokenize, │ │ (Every │ │ + Grafana │ │ Opsgenie │ │
│ │ Mask, │ │ transform │ │ │ │ │ │
│ │ Consent) │ │ logged) │ │ │ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Conclusion
Building data infrastructure for African fintech is not simply a matter of applying Silicon Valley patterns to a different market. The combination of multi-channel payment fragmentation, aggressive fraud vectors, stringent CBN reporting requirements, and the imperative to serve underbanked populations creates a unique engineering challenge.
The key architectural principles to remember:
-
Normalize early, specialize late: Invest heavily in channel adapters that produce a clean canonical event format. Every downstream system benefits from this normalization.
-
Fraud detection is a streaming problem: Batch fraud detection is insufficient when SIM swap attacks drain accounts in minutes. Build real-time fraud scoring from day one, even if it starts with simple rules before adding ML.
-
Compliance is an architecture concern, not an afterthought: Bake NDPR data classification, CBN audit trails, and retention policies into your data pipeline design. Retrofitting compliance is orders of magnitude more expensive.
-
Alternative data unlocks financial inclusion: The same transaction data you collect for fraud detection and regulatory reporting can power credit scoring models that serve Nigeria's 40M+ underbanked adults — a massive commercial opportunity and social impact.
-
Right-size your infrastructure to your stage: A PostgreSQL database and Python cron jobs can take you further than you think. Migrate to Kafka, Flink, and ClickHouse when your data volume and latency requirements genuinely demand it.
The patterns in this guide — event-driven architectures, real-time stream processing, automated regulatory reporting, and alternative credit scoring — are not theoretical. They are being used today by Nigerian fintechs processing millions of daily transactions. The challenge now is to implement them with the rigor and cost discipline that the African market demands.
References
- CBN Framework for Regulatory Sandbox Operations - Central Bank of Nigeria
- Nigeria Data Protection Regulation (NDPR) - NITDA
- NIBSS Instant Payment (NIP) Technical Specification - NIBSS
- Apache Kafka Documentation - Apache Software Foundation
- Apache Flink Documentation - Apache Software Foundation
- Building Real-Time Fraud Detection Systems - Tyler Akidau et al.
- EFInA Access to Financial Services in Nigeria Survey - EFInA
- African Fintech Ecosystem Report - Disrupt Africa
- Machine Learning for Credit Scoring in Emerging Markets - arXiv
- Designing Data-Intensive Applications - Martin Kleppmann
Key Takeaways
- ✓Event-driven architectures with Apache Kafka enable real-time processing of multi-channel payment data across USSD, POS, bank transfers, and mobile wallets
- ✓Real-time fraud detection pipelines using Kafka Streams and Apache Flink can flag suspicious transactions within 200ms while maintaining less than 0.1% false positive rates
- ✓CBN regulatory reporting and NDPR compliance can be automated through purpose-built data pipelines with audit trails and data lineage tracking
- ✓Alternative credit scoring models using transaction history, airtime patterns, and merchant data open financial services to Nigeria's 40M+ underbanked adults
- ✓Cost-optimized architectures using open-source tools and smart cloud resource management keep infrastructure costs viable for high-growth startups



