Building Automated AML Platforms for Nigerian Banks: A Data Engineering Blueprint for CBN's New Baseline Standards
A comprehensive technical guide to building automated Anti-Money Laundering platforms that meet CBN Circular BSD/DIR/PUB/LAB/019/002 baseline standards, covering real-time transaction monitoring, sanctions screening, AI/ML model governance, regulatory reporting pipelines, and implementation roadmaps for Nigerian financial institutions.
On March 10, 2026, the Central Bank of Nigeria issued Circular BSD/DIR/PUB/LAB/019/002 — the Baseline Standards for Automated Anti-Money Laundering Solutions — mandating every bank, fintech, mobile money operator, and payment service provider in Nigeria to deploy automated AML/CFT/CPF systems. Deposit money banks have 18 months; other institutions have 24 months; and every institution must submit an implementation roadmap within 90 days. Coming after ₦15 billion in AML fines levied on 29 banks in 2024 and Nigeria's hard-won exit from the FATF grey list in October 2025, this circular is not a suggestion — it is a survival requirement. This guide provides a complete data engineering blueprint for building these platforms: from real-time transaction monitoring and sanctions screening to AI/ML model governance, automated STR/CTR reporting, and the phased implementation roadmap that CBN expects on your desk by June 2026.
- Understanding of data streaming and event-driven architecture
- Familiarity with financial transaction systems and core banking platforms
- Basic knowledge of AML/CFT concepts and regulatory reporting
- Experience with Python, SQL, Apache Kafka, and distributed systems

Introduction: The End of Manual AML in Nigerian Banking
On March 10, 2026, the Central Bank of Nigeria issued Circular BSD/DIR/PUB/LAB/019/002 — "Baseline Standards for Automated Anti-Money Laundering (AML) Solutions for Financial Institutions in Nigeria." The circular's opening line sets the tone: manual AML/CFT/CPF controls are no longer sufficient to manage evolving risks.
This is not a gentle recommendation. It is a directive backed by the full weight of CBN's supervisory authority, and it arrives in a context that makes non-compliance existential:
- ₦15 billion in fines: In 2024, CBN penalized 29 banks for AML/CTF violations. Zenith Bank alone absorbed $9.6 million in penalties. The root causes cited were consistent: inadequate customer due diligence, weak transaction monitoring systems, and insufficient internal controls.
- FATF grey list exit: Nigeria was removed from the FATF list of jurisdictions under increased monitoring in October 2025, after more than two years of remedial action. Maintaining this status requires demonstrating sustained improvement — and CBN is making clear that automated systems are how that improvement will be measured.
- 11.2 billion transactions: NIBSS processed 11.2 billion transactions in 2024, totalling ₦1.07 quadrillion. Monitoring this volume manually is not difficult — it is impossible.
- Personal liability: The circular explicitly states that penalties may affect both institutions and accountable individuals — meaning compliance officers and senior management carry personal regulatory exposure.
The 10 Baseline Standards
The circular establishes ten mandatory capabilities that every automated AML platform must deliver:
┌─────────────────────────────────────────────────────────────────────────────┐
│ CBN BASELINE STANDARDS FOR AUTOMATED AML SOLUTIONS │
│ Circular BSD/DIR/PUB/LAB/019/002 (March 10, 2026) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STANDARD │ REQUIREMENT │ DATA ENGINEERING CAPABILITY │
│ ─────────┼────────────────────────────────┼─────────────────────────── │
│ 1 │ Customer ID & Verification │ BVN/NIN API integration, │
│ │ │ identity resolution pipeline │
│ 2 │ Risk-Based Customer Profiling │ Dynamic scoring engine, │
│ │ │ ML risk models │
│ 3 │ Sanctions Screening │ Real-time list matching, │
│ │ │ fuzzy name algorithms │
│ 4 │ PEP & Adverse Media │ Continuous monitoring feeds, │
│ │ │ NLP media scanning │
│ 5 │ Transaction Monitoring │ Streaming analytics (Kafka/ │
│ │ │ Flink), anomaly detection │
│ 6 │ Case Management │ Workflow orchestration, │
│ │ │ investigation data platform │
│ 7 │ Regulatory Reporting │ Automated STR/CTR pipelines, │
│ │ │ NFIU submission integration │
│ 8 │ Audit & Governance │ Immutable logs, RBAC, MFA, │
│ │ │ NDPA-compliant storage │
│ 9 │ AI/ML Model Governance │ MLOps pipeline, validation │
│ │ │ framework, bias monitoring │
│ 10 │ Vendor Management │ Third-party risk assessment, │
│ │ │ SLA monitoring, exit planning │
│ │
│ COMPLIANCE DEADLINES │
│ ├── June 10, 2026 ......... Implementation roadmap due (ALL institutions) │
│ ├── September 10, 2027 .... Full compliance (Deposit Money Banks) │
│ └── March 10, 2028 ........ Full compliance (all other institutions) │
│ │
│ COVERED INSTITUTIONS │
│ ├── Deposit Money Banks (commercial banks) │
│ ├── Fintechs and Payment Service Providers │
│ ├── Mobile Money Operators │
│ ├── International Money Transfer Operators │
│ ├── Microfinance banks, mortgage institutions, finance companies │
│ └── New licence applicants (must demonstrate compliance or present plan) │
│ │
│ ENFORCEMENT │
│ ├── Remedial directives and administrative sanctions │
│ ├── Financial penalties under MLPPA 2022 and CBN Act │
│ ├── Personal liability for executives and compliance officers │
│ └── Monitored via off-site surveillance, on-site examinations, │
│ and thematic regulatory reviews │
│ │
└─────────────────────────────────────────────────────────────────────────────┘The Legal Foundation
This circular does not exist in isolation. It sits atop a legal framework that has been steadily tightening:
- Money Laundering (Prevention and Prohibition) Act, 2022 (MLPPA): The primary AML legislation. Prescribes fines up to ₦10 million for individuals and ₦25 million+ for corporate entities. Requires Suspicious Transaction Reports (STRs) within 24 hours and Currency Transaction Reports (CTRs) for transactions exceeding ₦5 million (individuals) or ₦10 million (corporates).
- Terrorism (Prevention and Prohibition) Act, 2022: Counter-terrorism financing obligations.
- Nigeria Data Protection Act (NDPA) 2023: Data protection requirements for financial crime data processing.
- CBN AML/CFT/CPF Regulations 2022: The regulatory framework that this circular operationalizes through automation requirements.
- NFIU Act: Establishes the Nigerian Financial Intelligence Unit and its reporting requirements.
- FATF 40 Recommendations: The international standards that underpin Nigeria's entire AML framework and that GIABA (the regional FATF-style body) evaluates in mutual assessments.
This guide provides a complete data engineering blueprint for building an automated AML platform that meets all 10 baseline standards. We cover architecture, implementation patterns, code examples, and the phased roadmap that CBN expects on your desk by June 2026.
System Architecture: End-to-End Automated AML Platform
An AML platform that meets CBN's baseline standards is fundamentally a real-time data processing system that ingests financial transactions, enriches them with customer and risk context, screens them against sanctions and PEP lists, monitors them for suspicious patterns, generates and manages investigation cases, and produces regulatory reports — all with tamper-proof audit trails and strict access controls.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ AUTOMATED AML PLATFORM ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DATA SOURCES │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Core Banking │ │ Card Systems │ │ E-Channels │ │ Mobile Money │ │
│ │ System (CBS) │ │ (POS, ATM) │ │ (NIP, USSD) │ │ Platform │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │ │
│ └────────────────┼────────────────┼────────────────┘ │
│ ▼ │
│ INGESTION LAYER │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Apache Kafka (Transaction Stream) │ │
│ │ Topics: txn.cards | txn.transfers | txn.deposits | txn.lending │ │
│ │ txn.mobile_money | txn.fx | customer.events │ │
│ └──────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ SCREENING │ │ MONITORING │ │ ENRICHMENT │ │
│ │ SERVICE │ │ ENGINE │ │ SERVICE │ │
│ │ │ │ │ │ │ │
│ │ Sanctions │ │ Rule Engine │ │ Customer Risk │ │
│ │ PEP Lists │ │ ML Models │ │ Profile Lookup │ │
│ │ Adverse │ │ Behavioral │ │ BVN/NIN Data │ │
│ │ Media │ │ Analytics │ │ Account History│ │
│ └──────┬───────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └────────────────────┼──────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ ALERT MANAGEMENT │ │
│ │ Alert scoring → Deduplication → Prioritization → Routing │ │
│ └──────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ CASE MANAGEMENT │ │ REGULATORY │ │
│ │ │ │ REPORTING │ │
│ │ Investigation │──────────► │ │ │
│ │ workflows │ │ STR generation │ │
│ │ Evidence capture │ │ CTR automation │ │
│ │ SLA tracking │ │ NFIU submission │ │
│ │ Escalation rules │ │ CBN returns │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ DATA LAYER │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ PostgreSQL │ Elasticsearch │ Redis │ MinIO/S3 │ │
│ │ (Customers, │ (Transaction │ (Sanctions │ (Evidence, │ │
│ │ Cases, Audit) │ Search, Logs) │ Cache, │ Reports, │ │
│ │ │ │ Sessions) │ Archives) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ CROSS-CUTTING CONCERNS │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Audit Trail (Immutable) │ RBAC + MFA │ Encryption │ Monitoring │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Event-Driven Architecture with Kafka
The core design principle is event-driven: every financial transaction is published to Kafka as an event, and downstream services (screening, monitoring, enrichment) consume these events independently. This ensures:
- Decoupling: Core banking systems are not slowed by AML processing. Transactions complete normally; AML checks run asynchronously (or synchronously for blocking checks like sanctions screening).
- Scalability: Consumer groups can be scaled independently. Transaction monitoring during Ramadan or salary payment periods can handle 10x normal volume by adding consumer instances.
- Replayability: Kafka's retention allows replaying historical transactions through updated rules or models — essential when CBN issues new typology guidance.
# Transaction event schema published to Kafka
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from enum import Enum
class TransactionChannel(Enum):
NIP = "nip" # NIBSS Instant Payment
CARD_POS = "card_pos" # Point of Sale
CARD_ATM = "card_atm" # ATM withdrawal
CARD_WEB = "card_web" # Online card payment
USSD = "ussd" # USSD banking
MOBILE_APP = "mobile_app"
MOBILE_MONEY = "mobile_money"
BRANCH = "branch" # Over-the-counter
FX = "fx" # Foreign exchange
LENDING = "lending" # Loan disbursement/repayment
@dataclass
class TransactionEvent:
"""
Canonical transaction event published to Kafka.
All source systems (CBS, card switch, mobile money) normalize
their transactions to this schema before publishing.
"""
transaction_id: str
timestamp: datetime
channel: TransactionChannel
# Originator
originator_account_id: str
originator_customer_id: str
originator_bvn: str | None
originator_institution_code: str
# Beneficiary
beneficiary_account_id: str | None
beneficiary_customer_id: str | None
beneficiary_bvn: str | None
beneficiary_institution_code: str | None
# Transaction details
amount: Decimal
currency: str # NGN, USD, GBP, EUR
transaction_type: str # credit, debit, transfer, fx_buy, fx_sell
narration: str
# Location context
originator_country: str
beneficiary_country: str | None
device_id: str | None
ip_address: str | None
geo_location: str | None
# Metadata
is_cross_border: bool
is_cash: bool
source_system: str # Which system originated this eventMicroservices Decomposition
The platform is decomposed into focused services, each aligned with one or more CBN baseline standards:
| Service | CBN Standards | Responsibility |
|---|---|---|
| Customer Service | 1, 2 | Customer profiles, KYC data, risk scoring |
| Screening Service | 3, 4 | Sanctions, PEP, adverse media checks |
| Monitoring Engine | 5 | Transaction monitoring, anomaly detection |
| Alert Manager | 5, 6 | Alert scoring, dedup, routing |
| Case Management | 6 | Investigation workflows, evidence |
| Reporting Service | 7 | STR/CTR generation, NFIU submission |
| Audit Service | 8 | Immutable logs, access control |
| ML Platform | 9 | Model training, validation, serving |
| Admin Portal | 8, 10 | Configuration, user management, vendor oversight |
Customer Identification and Risk-Based Profiling
CBN Standards 1 and 2 require that AML solutions integrate with national identity databases (BVN, NIN) for customer identification and implement dynamic, risk-based customer profiling that goes beyond simple transaction data.
BVN/NIN Integration Architecture
Every customer onboarded by a Nigerian financial institution must be verified against the Bank Verification Number (BVN) database maintained by NIBSS and, increasingly, the National Identity Number (NIN) database maintained by NIMC. The AML platform must maintain a synchronized customer identity store:
from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
VERY_HIGH = "very_high"
PROHIBITED = "prohibited"
@dataclass
class CustomerRiskProfile:
"""
Comprehensive customer risk profile combining identity verification,
static risk factors, and dynamic behavioral scoring.
"""
customer_id: str
bvn: str
nin: str | None
# Identity verification status
bvn_verified: bool = False
nin_verified: bool = False
identity_match_score: float = 0.0
# Static risk factors (assessed at onboarding, updated periodically)
customer_type: str = "individual" # individual, corporate, trust
occupation_risk: RiskLevel = RiskLevel.MEDIUM
industry_risk: RiskLevel = RiskLevel.MEDIUM # For corporates
country_risk: RiskLevel = RiskLevel.LOW
product_risk: RiskLevel = RiskLevel.MEDIUM
channel_risk: RiskLevel = RiskLevel.MEDIUM
is_pep: bool = False
pep_level: str | None = None # domestic, foreign, international_org
is_sanctioned: bool = False
# Dynamic risk factors (updated continuously from transaction behavior)
transaction_volume_score: float = 0.0 # 0-100
transaction_velocity_score: float = 0.0 # 0-100
geographic_dispersion_score: float = 0.0 # 0-100
channel_anomaly_score: float = 0.0 # 0-100
peer_group_deviation: float = 0.0 # 0-100
cash_intensity_score: float = 0.0 # 0-100
# Overall risk
composite_risk_score: float = 0.0 # 0-100
risk_level: RiskLevel = RiskLevel.MEDIUM
last_risk_assessment: datetime | None = None
next_review_date: date | None = None
# Risk assessment triggers
risk_override_by: str | None = None # Manual override by analyst
risk_override_reason: str | None = None
risk_change_history: list = field(default_factory=list)
class CustomerRiskEngine:
"""
Dynamic risk scoring engine that combines static and behavioral
factors into a composite risk score. Re-evaluates on triggers:
- New transaction exceeding thresholds
- Sanctions/PEP list update matches
- Account activity after dormancy
- Periodic review schedule
"""
# Weight configuration for composite risk calculation
RISK_WEIGHTS = {
'customer_type': 0.05,
'occupation_risk': 0.08,
'industry_risk': 0.07,
'country_risk': 0.10,
'product_risk': 0.05,
'channel_risk': 0.05,
'pep_status': 0.12,
'sanctions_status': 0.15, # Highest weight — regulatory imperative
'transaction_volume': 0.08,
'transaction_velocity': 0.08,
'geographic_dispersion': 0.05,
'channel_anomaly': 0.05,
'peer_group_deviation': 0.04,
'cash_intensity': 0.03,
}
# Review frequency based on risk level
REVIEW_SCHEDULE = {
RiskLevel.LOW: 365, # Annual review
RiskLevel.MEDIUM: 180, # Semi-annual
RiskLevel.HIGH: 90, # Quarterly
RiskLevel.VERY_HIGH: 30, # Monthly
}
def calculate_composite_score(self, profile: CustomerRiskProfile) -> float:
"""
Calculate weighted composite risk score from all risk factors.
Returns score on 0-100 scale.
"""
factor_scores = {
'customer_type': self._score_customer_type(profile.customer_type),
'occupation_risk': self._risk_level_to_score(profile.occupation_risk),
'industry_risk': self._risk_level_to_score(profile.industry_risk),
'country_risk': self._risk_level_to_score(profile.country_risk),
'product_risk': self._risk_level_to_score(profile.product_risk),
'channel_risk': self._risk_level_to_score(profile.channel_risk),
'pep_status': 90.0 if profile.is_pep else 10.0,
'sanctions_status': 100.0 if profile.is_sanctioned else 0.0,
'transaction_volume': profile.transaction_volume_score,
'transaction_velocity': profile.transaction_velocity_score,
'geographic_dispersion': profile.geographic_dispersion_score,
'channel_anomaly': profile.channel_anomaly_score,
'peer_group_deviation': profile.peer_group_deviation,
'cash_intensity': profile.cash_intensity_score,
}
composite = sum(
factor_scores[factor] * weight
for factor, weight in self.RISK_WEIGHTS.items()
)
return min(composite, 100.0)
def determine_risk_level(self, score: float, profile: CustomerRiskProfile) -> RiskLevel:
"""
Map composite score to risk level.
Sanctioned customers are always PROHIBITED regardless of score.
"""
if profile.is_sanctioned:
return RiskLevel.PROHIBITED
if score >= 80:
return RiskLevel.VERY_HIGH
elif score >= 60:
return RiskLevel.HIGH
elif score >= 35:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def _risk_level_to_score(self, level: RiskLevel) -> float:
return {
RiskLevel.LOW: 15.0,
RiskLevel.MEDIUM: 40.0,
RiskLevel.HIGH: 70.0,
RiskLevel.VERY_HIGH: 90.0,
RiskLevel.PROHIBITED: 100.0,
}[level]
def _score_customer_type(self, customer_type: str) -> float:
return {
'individual': 20.0,
'sole_proprietor': 35.0,
'corporate': 45.0,
'trust': 60.0,
'ngo': 50.0,
'pfa': 30.0,
}.get(customer_type, 40.0)Customer Risk Profile Database Schema
-- Customer risk profile with full audit history
CREATE TABLE customer_risk_profiles (
customer_id VARCHAR(50) PRIMARY KEY,
bvn VARCHAR(11) NOT NULL,
nin VARCHAR(11),
-- Identity verification
bvn_verified BOOLEAN DEFAULT FALSE,
nin_verified BOOLEAN DEFAULT FALSE,
bvn_verified_at TIMESTAMP WITH TIME ZONE,
nin_verified_at TIMESTAMP WITH TIME ZONE,
-- Static risk factors
customer_type VARCHAR(30) NOT NULL DEFAULT 'individual',
occupation_risk VARCHAR(20) DEFAULT 'medium',
industry_risk VARCHAR(20) DEFAULT 'medium',
country_risk VARCHAR(20) DEFAULT 'low',
product_risk VARCHAR(20) DEFAULT 'medium',
channel_risk VARCHAR(20) DEFAULT 'medium',
is_pep BOOLEAN DEFAULT FALSE,
pep_level VARCHAR(30),
pep_source VARCHAR(100),
is_sanctioned BOOLEAN DEFAULT FALSE,
sanctions_list VARCHAR(100),
-- Dynamic scores (updated by streaming pipeline)
transaction_volume_score NUMERIC(5,2) DEFAULT 0,
transaction_velocity_score NUMERIC(5,2) DEFAULT 0,
geographic_dispersion_score NUMERIC(5,2) DEFAULT 0,
channel_anomaly_score NUMERIC(5,2) DEFAULT 0,
peer_group_deviation NUMERIC(5,2) DEFAULT 0,
cash_intensity_score NUMERIC(5,2) DEFAULT 0,
-- Composite risk
composite_risk_score NUMERIC(5,2) DEFAULT 0,
risk_level VARCHAR(20) DEFAULT 'medium',
last_risk_assessment TIMESTAMP WITH TIME ZONE,
next_review_date DATE,
-- Override
risk_override_by VARCHAR(100),
risk_override_reason TEXT,
risk_override_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_crp_risk_level ON customer_risk_profiles(risk_level);
CREATE INDEX idx_crp_review_date ON customer_risk_profiles(next_review_date);
CREATE INDEX idx_crp_pep ON customer_risk_profiles(is_pep) WHERE is_pep = TRUE;
CREATE INDEX idx_crp_sanctioned ON customer_risk_profiles(is_sanctioned)
WHERE is_sanctioned = TRUE;
-- Risk score change history (for audit and trend analysis)
CREATE TABLE risk_score_history (
id BIGSERIAL PRIMARY KEY,
customer_id VARCHAR(50) REFERENCES customer_risk_profiles(customer_id),
previous_score NUMERIC(5,2),
new_score NUMERIC(5,2),
previous_level VARCHAR(20),
new_level VARCHAR(20),
trigger_type VARCHAR(50), -- 'transaction', 'list_update', 'periodic', 'manual'
trigger_details JSONB,
assessed_by VARCHAR(50), -- 'system' or analyst ID
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_rsh_customer ON risk_score_history(customer_id, created_at);Real-Time Sanctions and PEP Screening
CBN Standards 3 and 4 demand real-time screening against sanctions lists (CBN, OFAC, UN, EU) with automatic transaction blocking on confirmed matches, plus continuous monitoring of politically exposed persons (PEP) registers and adverse media sources. This is where most Nigerian banks have historically struggled — and where the ₦15 billion in 2024 fines originated.
Sanctions List Ingestion Pipeline
Sanctions lists are published by multiple authorities in different formats and update frequencies. The ingestion pipeline normalizes all lists into a unified screening database:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SanctionsListConfig:
"""Configuration for each sanctions list source."""
name: str
authority: str
url: str
format: str # 'xml', 'csv', 'json', 'api'
update_frequency: str # 'daily', 'weekly', 'realtime'
blocking_action: str # 'auto_block', 'alert_only', 'enhanced_monitoring'
# Nigerian financial institutions must screen against all of these
SANCTIONS_LISTS = [
SanctionsListConfig(
name="CBN AML/CFT Sanctions List",
authority="CBN",
url="https://www.cbn.gov.ng/sanctions/",
format="csv",
update_frequency="weekly",
blocking_action="auto_block"
),
SanctionsListConfig(
name="OFAC SDN List",
authority="US Treasury / OFAC",
url="https://www.treasury.gov/ofac/downloads/sdn.xml",
format="xml",
update_frequency="daily",
blocking_action="auto_block"
),
SanctionsListConfig(
name="UN Security Council Consolidated List",
authority="United Nations",
url="https://scsanctions.un.org/resources/xml/en/consolidated.xml",
format="xml",
update_frequency="daily",
blocking_action="auto_block"
),
SanctionsListConfig(
name="EU Consolidated Sanctions List",
authority="European Union",
url="https://webgate.ec.europa.eu/fsd/fsf/public/files/xmlFullSanctionsList_1_1/content",
format="xml",
update_frequency="daily",
blocking_action="auto_block"
),
]
class SanctionsListIngestionPipeline:
"""
Ingests, normalizes, and indexes sanctions lists from multiple authorities.
Runs on a scheduled basis and immediately on notification of list updates.
"""
async def ingest_all_lists(self):
"""Ingest all configured sanctions lists."""
for config in SANCTIONS_LISTS:
try:
raw_data = await self._fetch_list(config)
normalized_entries = self._normalize(raw_data, config)
await self._update_screening_database(normalized_entries, config)
await self._notify_screening_service(config.name)
await self.audit.log({
'action': 'sanctions_list_ingestion',
'list_name': config.name,
'entries_count': len(normalized_entries),
'status': 'success',
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
await self.alert_service.critical(
f"Sanctions list ingestion failed: {config.name}: {e}"
)
def _normalize(self, raw_data, config: SanctionsListConfig) -> list:
"""
Normalize entries from different list formats into a unified schema.
Each entry has: names (including aliases), identifiers, dates,
nationalities, and list metadata.
"""
entries = []
for raw_entry in self._parse_format(raw_data, config.format):
entry = {
'list_name': config.name,
'authority': config.authority,
'entity_type': raw_entry.get('type', 'individual'),
'primary_name': raw_entry['name'],
'aliases': raw_entry.get('aliases', []),
'date_of_birth': raw_entry.get('dob'),
'nationalities': raw_entry.get('nationalities', []),
'identifiers': raw_entry.get('identifiers', {}),
'sanctions_programs': raw_entry.get('programs', []),
'listing_date': raw_entry.get('listing_date'),
'remarks': raw_entry.get('remarks', ''),
'blocking_action': config.blocking_action,
# Pre-computed matching tokens for fast screening
'name_tokens': self._generate_name_tokens(
raw_entry['name'],
raw_entry.get('aliases', [])
),
}
entries.append(entry)
return entries
def _generate_name_tokens(self, primary_name: str, aliases: list) -> dict:
"""
Pre-compute matching tokens for each name variant.
Supports exact, normalized, phonetic, and transliterated matching.
"""
all_names = [primary_name] + aliases
tokens = {
'exact': [],
'normalized': [],
'soundex': [],
'metaphone': [],
}
for name in all_names:
# Exact (lowercase, stripped)
tokens['exact'].append(name.lower().strip())
# Normalized (remove titles, punctuation, collapse spaces)
normalized = self._normalize_name(name)
tokens['normalized'].append(normalized)
# Phonetic encoding for sound-alike matching
tokens['soundex'].append(self._soundex(normalized))
tokens['metaphone'].append(self._double_metaphone(normalized))
return tokensFuzzy Name Matching for Nigerian Names
Name matching in Nigeria presents unique challenges that off-the-shelf Western-centric screening solutions handle poorly:
class NigerianNameMatcher:
"""
Specialized name matching for Nigerian naming patterns.
Nigerian naming challenges:
1. Yoruba names with prefixes: Adewale vs Ade Wale vs Wale
2. Igbo names with Anglicized variants: Chukwuemeka vs Emeka
3. Hausa names with Arabic transliterations: Mohammed/Muhammad/Muhammed
4. Compound names: Oluwaseun, Oluwadamilola (Olu + component)
5. Name ordering: surname-first vs given-first inconsistency
6. Married names: women may use maiden name, married name, or both
7. Professional titles embedded: Chief, Alhaji, Dr often part of name
"""
# Common Nigerian name prefixes and their variants
YORUBA_PREFIXES = {
'oluwa': ['olu'],
'ade': ['adewale', 'adebayo', 'adeyemi'],
'ayo': ['ayodele', 'ayomide'],
}
# Common Arabic-origin name transliterations (Hausa/Islamic names)
ARABIC_VARIANTS = {
'mohammed': ['muhammad', 'muhammed', 'mohamed', 'mohamad'],
'abdullahi': ['abdullah', 'abdulahi'],
'abubakar': ['abubacar', 'abu-bakar'],
'usman': ['othman', 'uthman', 'osman'],
'ibrahim': ['ibraheem', 'ebrahim'],
}
# Titles to strip before matching
TITLES = [
'chief', 'alhaji', 'alhaja', 'dr', 'prof', 'engr', 'arc',
'barr', 'hon', 'senator', 'prince', 'princess', 'pastor',
'reverend', 'imam', 'justice', 'otunba', 'oba', 'olori',
]
def match_score(self, query_name: str, list_name: str) -> float:
"""
Calculate match score between a transaction party name
and a sanctions list entry name.
Returns 0.0 (no match) to 1.0 (exact match).
"""
# Strip titles and normalize
query_clean = self._strip_titles(self._normalize(query_name))
list_clean = self._strip_titles(self._normalize(list_name))
# Strategy 1: Exact normalized match
if query_clean == list_clean:
return 1.0
# Strategy 2: Token-sorted match (handles name ordering)
query_tokens = sorted(query_clean.split())
list_tokens = sorted(list_clean.split())
if query_tokens == list_tokens:
return 0.98
# Strategy 3: Transliteration match (Arabic-origin names)
query_canonical = self._canonicalize_arabic_names(query_clean)
list_canonical = self._canonicalize_arabic_names(list_clean)
if sorted(query_canonical.split()) == sorted(list_canonical.split()):
return 0.95
# Strategy 4: Subset match (partial name match)
query_set = set(query_tokens)
list_set = set(list_tokens)
if query_set and list_set:
overlap = len(query_set & list_set)
max_len = max(len(query_set), len(list_set))
subset_score = overlap / max_len
if subset_score >= 0.7:
return subset_score * 0.90
# Strategy 5: Phonetic match
from difflib import SequenceMatcher
query_phonetic = self._double_metaphone(query_clean)
list_phonetic = self._double_metaphone(list_clean)
if query_phonetic and list_phonetic and query_phonetic == list_phonetic:
return 0.85
# Strategy 6: Edit distance (catch typos and minor variations)
similarity = SequenceMatcher(None, query_clean, list_clean).ratio()
return similarity
def _strip_titles(self, name: str) -> str:
"""Remove Nigerian honorary and professional titles."""
tokens = name.lower().split()
return ' '.join(t for t in tokens if t not in self.TITLES)
def _normalize(self, name: str) -> str:
"""Normalize whitespace, punctuation, and case."""
import re
name = name.lower().strip()
name = re.sub(r'[^\w\s]', '', name) # Remove punctuation
name = re.sub(r'\s+', ' ', name) # Collapse whitespace
return name
def _canonicalize_arabic_names(self, name: str) -> str:
"""Replace Arabic-origin name variants with canonical forms."""
tokens = name.split()
canonical_tokens = []
for token in tokens:
canonical = token
for base_form, variants in self.ARABIC_VARIANTS.items():
if token in variants or token == base_form:
canonical = base_form
break
canonical_tokens.append(canonical)
return ' '.join(canonical_tokens)Real-Time Screening Pipeline
The screening service evaluates every transaction party against the unified sanctions/PEP database. For CBN compliance, confirmed sanctions matches must trigger automatic transaction blocking:
class RealTimeScreeningService:
"""
Screens transaction parties against sanctions and PEP lists in real time.
Two operating modes:
1. PRE-TRANSACTION: Blocks transaction if confirmed sanctions match
(for wire transfers, FX, cross-border payments)
2. POST-TRANSACTION: Alerts on matches for review
(for high-volume, low-value channels like POS)
"""
# Thresholds for different match actions
BLOCK_THRESHOLD = 0.92 # Auto-block and alert
ALERT_THRESHOLD = 0.75 # Generate alert for manual review
DISMISS_THRESHOLD = 0.50 # Below this, no action
async def screen_transaction(
self,
transaction: TransactionEvent,
) -> ScreeningResult:
"""Screen originator and beneficiary against all active lists."""
results = []
# Screen originator
originator_matches = await self._screen_party(
name=transaction.originator_name,
identifiers={
'bvn': transaction.originator_bvn,
'account': transaction.originator_account_id,
},
country=transaction.originator_country,
)
results.extend(originator_matches)
# Screen beneficiary (if available)
if transaction.beneficiary_name:
beneficiary_matches = await self._screen_party(
name=transaction.beneficiary_name,
identifiers={
'bvn': transaction.beneficiary_bvn,
'account': transaction.beneficiary_account_id,
},
country=transaction.beneficiary_country,
)
results.extend(beneficiary_matches)
# Determine action
highest_score = max((r.score for r in results), default=0.0)
if highest_score >= self.BLOCK_THRESHOLD:
action = ScreeningAction.BLOCK
await self._block_transaction(transaction, results)
elif highest_score >= self.ALERT_THRESHOLD:
action = ScreeningAction.ALERT
await self._generate_screening_alert(transaction, results)
else:
action = ScreeningAction.PASS
return ScreeningResult(
transaction_id=transaction.transaction_id,
action=action,
matches=results,
screened_at=datetime.utcnow(),
)
async def _screen_party(self, name: str, identifiers: dict, country: str) -> list:
"""
Screen a party name against all lists.
Uses Redis-cached list data for performance.
"""
matcher = NigerianNameMatcher()
matches = []
# First: check by identifiers (exact match — fastest)
for id_type, id_value in identifiers.items():
if id_value:
exact_match = await self.redis.get(f"sanctions:id:{id_type}:{id_value}")
if exact_match:
matches.append(ScreeningMatch(
score=1.0,
match_type='identifier',
list_entry=json.loads(exact_match),
))
# Second: name-based matching against all list entries
# Uses pre-built index for performance
candidate_entries = await self._get_name_candidates(name)
for entry in candidate_entries:
score = matcher.match_score(name, entry['primary_name'])
# Also check aliases
for alias in entry.get('aliases', []):
alias_score = matcher.match_score(name, alias)
score = max(score, alias_score)
if score >= self.DISMISS_THRESHOLD:
matches.append(ScreeningMatch(
score=score,
match_type='name',
list_entry=entry,
))
return sorted(matches, key=lambda m: m.score, reverse=True)False Positive Management
False positives are the single biggest operational challenge in sanctions screening. Common Nigerian names like Mohammed Ibrahim or Usman Abdullahi match dozens of sanctions list entries. Without effective false positive management, compliance teams drown in alerts:
class FalsePositiveManager:
"""
Manages false positive suppression to reduce alert fatigue
while maintaining regulatory compliance.
Key principle: never suppress a TRUE match. False positive
rules must be reviewed quarterly and audited annually.
"""
async def evaluate_match(
self,
match: ScreeningMatch,
customer: CustomerRiskProfile,
) -> MatchEvaluation:
"""
Evaluate a screening match considering customer context
and historical false positive patterns.
"""
# Check if this customer was previously cleared for this list entry
prior_clearance = await self._check_prior_clearance(
customer.customer_id,
match.list_entry['id']
)
if prior_clearance and prior_clearance.still_valid:
return MatchEvaluation(
action='suppress',
reason=f'Previously cleared on {prior_clearance.cleared_date}',
clearance_id=prior_clearance.id,
requires_periodic_review=True,
next_review=prior_clearance.next_review_date,
)
# Apply secondary verification: compare additional data points
secondary_score = self._secondary_verification(match, customer)
if secondary_score < 0.3:
# Strong evidence this is NOT the sanctioned party
return MatchEvaluation(
action='auto_clear',
reason='Secondary verification indicates different individual',
details={
'dob_mismatch': not match.list_entry.get('dob')
or match.list_entry['dob'] != customer.date_of_birth,
'nationality_mismatch': customer.nationality
not in match.list_entry.get('nationalities', []),
'identifier_mismatch': True,
},
requires_analyst_review=True, # Auto-clear still needs sign-off
)
# Genuine potential match — escalate
return MatchEvaluation(
action='escalate',
reason='Potential true match — requires investigation',
priority='high' if match.score >= 0.92 else 'medium',
)
def _secondary_verification(
self,
match: ScreeningMatch,
customer: CustomerRiskProfile,
) -> float:
"""
Score how likely this match is a TRUE positive by comparing
additional data points beyond the name.
"""
signals = []
# Date of birth comparison
list_dob = match.list_entry.get('date_of_birth')
if list_dob and customer.date_of_birth:
signals.append(1.0 if list_dob == customer.date_of_birth else 0.0)
# Nationality comparison
list_nationalities = match.list_entry.get('nationalities', [])
if list_nationalities and customer.nationality:
signals.append(
1.0 if customer.nationality in list_nationalities else 0.0
)
# Identifier comparison (passport, national ID)
list_ids = match.list_entry.get('identifiers', {})
if list_ids:
id_match = any(
customer_id == list_ids.get(id_type)
for id_type, customer_id in [
('bvn', customer.bvn),
('nin', customer.nin),
('passport', customer.passport_number),
]
if customer_id
)
signals.append(1.0 if id_match else 0.0)
return sum(signals) / len(signals) if signals else 0.5Transaction Monitoring Engine
CBN Standard 5 is the heart of the AML platform: real-time or near-real-time transaction monitoring across all channels — cards, e-channels, deposits, and lending — with pattern detection for suspicious activity. With NIBSS processing 11.2 billion transactions in 2024, this is a streaming data engineering challenge at significant scale.
Streaming Architecture with Apache Flink
The transaction monitoring engine consumes the Kafka transaction stream and evaluates every transaction against both rule-based scenarios and ML-based anomaly detection:
# Conceptual Flink-style transaction monitoring pipeline
# (Implemented in Python for clarity; production may use Java/Scala Flink)
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
@dataclass
class MonitoringAlert:
alert_id: str
rule_id: str
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
customer_id: str
transaction_ids: list # One or more triggering transactions
alert_score: float # 0-100
description: str
typology: str # AML typology classification
channel: str
details: dict
created_at: datetime
class TransactionMonitoringEngine:
"""
Real-time transaction monitoring engine.
Evaluates every transaction against:
1. Rule-based scenarios (configurable, immediate)
2. Behavioral analytics (windowed aggregation)
3. ML anomaly detection (model-scored)
"""
async def evaluate_transaction(
self,
txn: TransactionEvent,
customer_profile: CustomerRiskProfile,
account_history: AccountHistory,
) -> list[MonitoringAlert]:
"""
Evaluate a single transaction against all active monitoring rules.
Returns list of alerts (may be empty if transaction is normal).
"""
alerts = []
# Layer 1: Threshold-based rules (instant evaluation)
alerts.extend(
await self._evaluate_threshold_rules(txn, customer_profile)
)
# Layer 2: Pattern-based rules (windowed aggregation)
alerts.extend(
await self._evaluate_pattern_rules(txn, customer_profile, account_history)
)
# Layer 3: ML-based anomaly detection
alerts.extend(
await self._evaluate_ml_models(txn, customer_profile, account_history)
)
# Deduplicate and score combined alerts
return self._consolidate_alerts(alerts)
async def _evaluate_threshold_rules(
self,
txn: TransactionEvent,
profile: CustomerRiskProfile,
) -> list[MonitoringAlert]:
"""
Immediate threshold checks — no historical data needed.
"""
alerts = []
# CTR threshold (MLPPA 2022: ₦5M individual, ₦10M corporate)
ctr_threshold = (
Decimal('10000000') if profile.customer_type == 'corporate'
else Decimal('5000000')
)
if txn.is_cash and txn.amount >= ctr_threshold:
alerts.append(MonitoringAlert(
alert_id=self._generate_id(),
rule_id='THR-001',
rule_name='CTR Threshold Exceeded',
severity='medium',
customer_id=txn.originator_customer_id,
transaction_ids=[txn.transaction_id],
alert_score=60.0,
description=f'Cash transaction of ₦{txn.amount:,.2f} exceeds '
f'CTR threshold of ₦{ctr_threshold:,.2f}',
typology='currency_transaction_report',
channel=txn.channel.value,
details={'amount': str(txn.amount), 'threshold': str(ctr_threshold)},
created_at=datetime.utcnow(),
))
# Cross-border transaction by high-risk customer
if txn.is_cross_border and profile.risk_level in (
RiskLevel.HIGH, RiskLevel.VERY_HIGH
):
alerts.append(MonitoringAlert(
alert_id=self._generate_id(),
rule_id='THR-005',
rule_name='Cross-Border Transaction by High-Risk Customer',
severity='high',
customer_id=txn.originator_customer_id,
transaction_ids=[txn.transaction_id],
alert_score=75.0,
description=f'Cross-border {txn.transaction_type} of '
f'₦{txn.amount:,.2f} to {txn.beneficiary_country} '
f'by {profile.risk_level.value}-risk customer',
typology='cross_border_high_risk',
channel=txn.channel.value,
details={
'destination_country': txn.beneficiary_country,
'customer_risk_level': profile.risk_level.value,
},
created_at=datetime.utcnow(),
))
return alerts
async def _evaluate_pattern_rules(
self,
txn: TransactionEvent,
profile: CustomerRiskProfile,
history: AccountHistory,
) -> list[MonitoringAlert]:
"""
Pattern-based rules that require windowed transaction history.
These detect structuring, smurfing, rapid movement, and other
multi-transaction typologies.
"""
alerts = []
# STRUCTURING DETECTION
# Multiple cash transactions just below CTR threshold within 24 hours
recent_cash = history.get_transactions(
hours=24,
is_cash=True,
transaction_type='credit'
)
ctr_threshold = Decimal('5000000')
just_below = [
t for t in recent_cash
if ctr_threshold * Decimal('0.7') <= t.amount < ctr_threshold
]
if len(just_below) >= 3:
total = sum(t.amount for t in just_below)
if total >= ctr_threshold:
alerts.append(MonitoringAlert(
alert_id=self._generate_id(),
rule_id='PAT-001',
rule_name='Possible Structuring (Cash)',
severity='high',
customer_id=txn.originator_customer_id,
transaction_ids=[t.transaction_id for t in just_below],
alert_score=80.0,
description=f'{len(just_below)} cash deposits totalling '
f'₦{total:,.2f} in 24h, each below CTR threshold',
typology='structuring',
channel=txn.channel.value,
details={
'transaction_count': len(just_below),
'total_amount': str(total),
'threshold': str(ctr_threshold),
},
created_at=datetime.utcnow(),
))
# RAPID MOVEMENT
# Large inflow immediately followed by outflow (within 2 hours)
recent_credits = history.get_transactions(
hours=2, transaction_type='credit'
)
recent_debits = history.get_transactions(
hours=2, transaction_type='debit'
)
large_credit = sum(t.amount for t in recent_credits)
large_debit = sum(t.amount for t in recent_debits)
if (large_credit > Decimal('2000000') and
large_debit > large_credit * Decimal('0.8')):
alerts.append(MonitoringAlert(
alert_id=self._generate_id(),
rule_id='PAT-003',
rule_name='Rapid Movement of Funds',
severity='high',
customer_id=txn.originator_customer_id,
transaction_ids=[
t.transaction_id
for t in recent_credits + recent_debits
],
alert_score=78.0,
description=f'₦{large_credit:,.2f} received and '
f'₦{large_debit:,.2f} moved out within 2 hours',
typology='rapid_movement',
channel=txn.channel.value,
details={
'total_inflow': str(large_credit),
'total_outflow': str(large_debit),
'retention_ratio': str(
(large_credit - large_debit) / large_credit
),
},
created_at=datetime.utcnow(),
))
# DORMANT ACCOUNT ACTIVATION
# Account with no transactions for 90+ days suddenly receives large transfer
if (history.days_since_last_transaction() > 90
and txn.amount > Decimal('1000000')):
alerts.append(MonitoringAlert(
alert_id=self._generate_id(),
rule_id='PAT-006',
rule_name='Dormant Account Activation',
severity='medium',
customer_id=txn.originator_customer_id,
transaction_ids=[txn.transaction_id],
alert_score=65.0,
description=f'Account dormant for {history.days_since_last_transaction()} '
f'days, now receiving ₦{txn.amount:,.2f}',
typology='dormant_activation',
channel=txn.channel.value,
details={
'dormancy_days': history.days_since_last_transaction(),
'amount': str(txn.amount),
},
created_at=datetime.utcnow(),
))
return alertsNigeria-Specific AML Typologies
Nigerian financial crime has distinctive patterns that generic AML platforms often miss. The monitoring engine must include scenarios for:
┌─────────────────────────────────────────────────────────────────────────────┐
│ NIGERIA-SPECIFIC AML TYPOLOGIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. POS AGENT NETWORK LAUNDERING │
│ ├── Cash deposited across multiple POS agents in same LGA │
│ ├── Aggregated by coordinating account │
│ ├── Often disguised as merchant payments │
│ └── Detection: cluster analysis on POS → account flow patterns │
│ │
│ 2. BUREAU DE CHANGE (BDC) ROUND-TRIPPING │
│ ├── Naira → Dollar → Naira cycles via parallel market │
│ ├── Exploits official/parallel exchange rate differential │
│ ├── Often involves multiple bank accounts │
│ └── Detection: FX transaction velocity + counterparty analysis │
│ │
│ 3. MOBILE MONEY LAYERING │
│ ├── Funds split across multiple mobile money wallets │
│ ├── Small transfers below reporting thresholds │
│ ├── Aggregated and withdrawn as cash │
│ └── Detection: network graph analysis of wallet-to-wallet flows │
│ │
│ 4. TRADE-BASED LAUNDERING (IMPORT/EXPORT) │
│ ├── Over/under-invoicing of goods (especially via Form M) │
│ ├── Phantom shipments with legitimate-looking documentation │
│ ├── Misclassification of goods to justify payment amounts │
│ └── Detection: trade price benchmarking + customs data correlation │
│ │
│ 5. SALARY ACCOUNT ABUSE │
│ ├── Ghost employees on payroll (government or corporate) │
│ ├── Salary accounts used as pass-through for illicit funds │
│ ├── Multiple "salary" credits from different organizations │
│ └── Detection: payroll pattern analysis + employer verification │
│ │
│ 6. REAL ESTATE LAYERING │
│ ├── Property purchases via structured cash deposits │
│ ├── Nominee buyers and shell companies │
│ ├── Under-declared property values │
│ └── Detection: large cash-to-real-estate flow patterns │
│ │
│ 7. CRYPTOCURRENCY ON/OFF-RAMP │
│ ├── P2P crypto trades via bank transfer (despite CBN's earlier ban) │
│ ├── Multiple small transfers to/from known crypto trader accounts │
│ ├── Narration keywords: "coin", "btc", "usdt", "binance" │
│ └── Detection: narration NLP + counterparty graph analysis │
│ │
└─────────────────────────────────────────────────────────────────────────────┘ML-Based Anomaly Detection
Rule-based monitoring catches known patterns. ML models catch the unknown — transactions that do not match any specific rule but are statistically unusual for a customer's behavioral profile:
import numpy as np
from datetime import datetime, timedelta
class BehavioralAnomalyDetector:
"""
ML-based anomaly detection using customer behavioral profiles.
Each customer has a learned "normal" transaction pattern.
Deviations from this pattern are scored as anomalies.
Approach: Isolation Forest + statistical Z-score hybrid.
"""
def extract_features(
self,
txn: TransactionEvent,
profile: CustomerRiskProfile,
history: AccountHistory,
) -> dict:
"""
Extract features for anomaly scoring.
Features compare current transaction against customer's
historical behavioral baseline.
"""
# Transaction-level features
features = {
'amount_zscore': self._amount_zscore(txn.amount, history),
'time_of_day_unusual': self._time_unusualness(
txn.timestamp, history
),
'channel_frequency': self._channel_frequency(
txn.channel, history
),
'beneficiary_is_new': 1.0 if self._is_new_beneficiary(
txn.beneficiary_account_id, history
) else 0.0,
'country_is_new': 1.0 if (
txn.beneficiary_country and
txn.beneficiary_country not in history.get_countries()
) else 0.0,
# Velocity features (rolling windows)
'txn_count_1h': history.transaction_count(hours=1),
'txn_count_24h': history.transaction_count(hours=24),
'txn_count_7d': history.transaction_count(hours=168),
'amount_sum_24h': float(history.amount_sum(hours=24)),
'amount_sum_7d': float(history.amount_sum(hours=168)),
# Peer group comparison
'peer_amount_percentile': self._peer_percentile(
txn.amount, profile
),
'peer_velocity_percentile': self._peer_velocity_percentile(
history.transaction_count(hours=24), profile
),
# Risk amplifiers
'customer_risk_score': profile.composite_risk_score,
'is_cross_border': 1.0 if txn.is_cross_border else 0.0,
'is_cash': 1.0 if txn.is_cash else 0.0,
}
return features
def _amount_zscore(self, amount: Decimal, history: AccountHistory) -> float:
"""
How many standard deviations is this amount from the
customer's typical transaction size?
"""
historical_amounts = history.get_amounts(days=90)
if len(historical_amounts) < 5:
return 0.0 # Insufficient history
mean = np.mean(historical_amounts)
std = np.std(historical_amounts)
if std == 0:
return 0.0
return (float(amount) - mean) / std
def _time_unusualness(
self,
timestamp: datetime,
history: AccountHistory,
) -> float:
"""
How unusual is this transaction time compared to the
customer's typical transaction hours?
A transaction at 3 AM from a customer who only transacts
during business hours is highly unusual.
"""
hour = timestamp.hour
historical_hours = history.get_transaction_hours(days=90)
if not historical_hours:
return 0.5 # No history
hour_counts = np.bincount(historical_hours, minlength=24)
total = sum(hour_counts)
if total == 0:
return 0.5
hour_frequency = hour_counts[hour] / total
return 1.0 - hour_frequency # Higher = more unusualAI/ML Model Governance
CBN Standard 9 imposes rigorous governance requirements on AI and ML models used in AML systems. This is not a generic "use AI responsibly" statement — it requires independent annual validation covering accuracy, performance drift, fairness audits, bias testing, and human review protocols. This standard reflects global regulatory trends (the EU AI Act, US OCC model risk management guidance) now arriving in Nigeria.
Model Validation Framework
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ModelValidationReport:
"""
Annual model validation report structure aligned with
CBN Standard 9 requirements.
"""
model_id: str
model_name: str
model_version: str
validation_date: datetime
validator: str # Independent validator (not model developer)
validation_type: str # 'annual', 'significant_change', 'ad_hoc'
# Accuracy metrics
accuracy: float
precision: float
recall: float
f1_score: float
auc_roc: float
# Alert effectiveness
true_positive_rate: float
false_positive_rate: float
alert_to_sar_conversion_rate: float
# Performance drift assessment
baseline_accuracy: float # Accuracy at last validation
accuracy_drift: float # Current - baseline
feature_drift_detected: bool
data_drift_detected: bool
concept_drift_detected: bool
# Fairness and bias audit
demographic_parity_scores: dict # By customer segment
equalized_odds_scores: dict
bias_findings: list
# Explainability assessment
top_features_stable: bool
shap_analysis_complete: bool
alert_explanation_quality: str # 'high', 'medium', 'low'
# Recommendations
findings: list
recommendations: list
risk_rating: str # 'low', 'medium', 'high'
approved_for_production: bool
class ModelGovernanceFramework:
"""
Manages the full lifecycle of AML ML models in compliance
with CBN Standard 9.
"""
# Performance drift thresholds that trigger retraining
DRIFT_THRESHOLDS = {
'accuracy_drop': 0.05, # 5% drop from baseline
'precision_drop': 0.08, # 8% drop
'false_positive_increase': 0.10, # 10% increase
'feature_drift_psi': 0.25, # Population Stability Index
}
async def monitor_model_performance(
self,
model_id: str,
) -> PerformanceReport:
"""
Continuous performance monitoring — runs daily.
Triggers retraining alerts when drift exceeds thresholds.
"""
# Get current performance metrics
current_metrics = await self._calculate_current_metrics(model_id)
baseline_metrics = await self._get_baseline_metrics(model_id)
drift_signals = []
# Check accuracy drift
accuracy_drop = baseline_metrics.accuracy - current_metrics.accuracy
if accuracy_drop > self.DRIFT_THRESHOLDS['accuracy_drop']:
drift_signals.append({
'type': 'accuracy_drift',
'baseline': baseline_metrics.accuracy,
'current': current_metrics.accuracy,
'drop': accuracy_drop,
})
# Check feature drift (Population Stability Index)
for feature in current_metrics.feature_distributions:
psi = self._calculate_psi(
baseline_metrics.feature_distributions[feature],
current_metrics.feature_distributions[feature],
)
if psi > self.DRIFT_THRESHOLDS['feature_drift_psi']:
drift_signals.append({
'type': 'feature_drift',
'feature': feature,
'psi': psi,
})
# Check false positive rate increase
fp_increase = (
current_metrics.false_positive_rate -
baseline_metrics.false_positive_rate
)
if fp_increase > self.DRIFT_THRESHOLDS['false_positive_increase']:
drift_signals.append({
'type': 'false_positive_drift',
'baseline_fpr': baseline_metrics.false_positive_rate,
'current_fpr': current_metrics.false_positive_rate,
})
if drift_signals:
await self.alert_service.send(
severity='high',
message=f'Model {model_id} performance drift detected',
details=drift_signals,
recommended_action='Schedule model retraining and validation'
)
return PerformanceReport(
model_id=model_id,
metrics=current_metrics,
drift_signals=drift_signals,
retraining_recommended=len(drift_signals) > 0,
)
async def run_bias_audit(self, model_id: str) -> BiasAuditReport:
"""
Bias audit ensuring model does not discriminate unfairly.
CBN requires fairness audits to ensure AML models do not
disproportionately flag transactions based on:
- Customer geography (e.g., North vs South)
- Transaction size segment (small business vs corporate)
- Account age (new customers vs established)
- Channel usage pattern
"""
model = await self._load_model(model_id)
test_data = await self._get_bias_test_dataset(model_id)
segments = {
'geographic_zone': ['north_central', 'north_east', 'north_west',
'south_east', 'south_south', 'south_west'],
'customer_segment': ['retail', 'sme', 'commercial', 'corporate'],
'account_age': ['new_0_6m', 'established_6_24m', 'mature_24m_plus'],
'primary_channel': ['branch', 'mobile', 'internet', 'pos', 'ussd'],
}
bias_findings = []
for segment_name, segment_values in segments.items():
segment_metrics = {}
for value in segment_values:
subset = test_data[test_data[segment_name] == value]
if len(subset) < 100:
continue # Insufficient data for reliable metrics
predictions = model.predict(subset)
segment_metrics[value] = {
'alert_rate': float(predictions.mean()),
'precision': self._precision(subset, predictions),
'recall': self._recall(subset, predictions),
'sample_size': len(subset),
}
# Check for significant disparities
alert_rates = [m['alert_rate'] for m in segment_metrics.values()]
if alert_rates:
max_rate = max(alert_rates)
min_rate = min(alert_rates)
disparity_ratio = max_rate / max(min_rate, 0.001)
if disparity_ratio > 3.0: # More than 3x difference
bias_findings.append({
'segment': segment_name,
'disparity_ratio': disparity_ratio,
'details': segment_metrics,
'severity': 'high' if disparity_ratio > 5.0 else 'medium',
})
return BiasAuditReport(
model_id=model_id,
audit_date=datetime.utcnow(),
segments_tested=list(segments.keys()),
findings=bias_findings,
overall_risk='high' if any(
f['severity'] == 'high' for f in bias_findings
) else 'low',
)Explainability: Why Did This Alert Fire?
CBN Standard 9 requires that investigators must understand why an alert was generated. Black-box models are not acceptable — every alert must come with an explanation:
class AlertExplainer:
"""
Generates human-readable explanations for ML-generated alerts.
Uses SHAP (SHapley Additive exPlanations) values to attribute
alert decisions to specific features.
"""
def explain_alert(
self,
alert: MonitoringAlert,
model_features: dict,
shap_values: dict,
) -> AlertExplanation:
"""
Generate a compliance-officer-readable explanation of
why the ML model flagged this transaction.
"""
# Sort features by absolute SHAP contribution
feature_contributions = sorted(
shap_values.items(),
key=lambda x: abs(x[1]),
reverse=True,
)
# Top contributing factors
top_factors = []
for feature_name, shap_value in feature_contributions[:5]:
factor = {
'feature': feature_name,
'value': model_features[feature_name],
'contribution': shap_value,
'direction': 'increases risk' if shap_value > 0 else 'decreases risk',
'explanation': self._generate_explanation(
feature_name,
model_features[feature_name],
shap_value,
),
}
top_factors.append(factor)
# Natural language summary
summary = self._generate_summary(top_factors, alert)
return AlertExplanation(
alert_id=alert.alert_id,
summary=summary,
top_factors=top_factors,
model_confidence=alert.alert_score / 100.0,
all_contributions=dict(feature_contributions),
)
def _generate_explanation(
self,
feature: str,
value,
shap_value: float,
) -> str:
"""Map technical features to investigator-friendly language."""
explanations = {
'amount_zscore': (
f'Transaction amount is {abs(value):.1f} standard deviations '
f'{"above" if value > 0 else "below"} this customer\'s '
f'typical transaction size'
),
'time_of_day_unusual': (
f'Transaction occurred at an unusual time for this customer '
f'(unusualness score: {value:.0%})'
),
'beneficiary_is_new': (
'Funds sent to a beneficiary this customer has never '
'transacted with before'
),
'txn_count_24h': (
f'Customer has made {int(value)} transactions in the last '
f'24 hours, which is {"higher" if shap_value > 0 else "lower"} '
f'than typical'
),
'peer_amount_percentile': (
f'Transaction amount is in the {value:.0f}th percentile '
f'compared to similar customers'
),
'is_cross_border': (
'This is a cross-border transaction' if value
else 'This is a domestic transaction'
),
'customer_risk_score': (
f'Customer has an elevated risk score of {value:.0f}/100'
),
}
return explanations.get(feature, f'{feature} = {value}')Case Management and Investigation Workflows
CBN Standard 6 requires enterprise-grade case management tools that automatically generate, assign, track, and resolve investigation cases from alerts. This transforms what is often a spreadsheet-and-email process at Nigerian banks into a structured workflow.
Alert-to-Case Lifecycle
┌─────────────────────────────────────────────────────────────────────────────┐
│ ALERT-TO-CASE LIFECYCLE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ ALERT │───►│ TRIAGE │───►│ INVESTIGATION│───►│ RESOLUTION │ │
│ │ Generated│ │ │ │ │ │ │ │
│ └─────────┘ └──────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │ │
│ │ ┌────┴────┐ ┌────┴────┐ ┌────┴────┐ │
│ │ │ Dismiss │ │ Escalate│ │ Close │ │
│ │ │ (false │ │ to SAR │ │ (no │ │
│ │ │ positive│ │ team │ │ further │ │
│ │ │ + reason│ │ │ │ action) │ │
│ │ └─────────┘ └────┬────┘ └─────────┘ │
│ │ │ │
│ │ ┌────┴────┐ │
│ │ │ FILE │ │
│ │ │ STR │──► NFIU Submission │
│ │ └─────────┘ │
│ │ │
│ SLA TIMELINES │
│ ├── Alert to Triage: ≤ 4 hours (auto-assigned) │
│ ├── Triage to Decision: ≤ 24 hours │
│ ├── Investigation: ≤ 5 business days (standard) │
│ │ ≤ 2 business days (high priority) │
│ ├── STR Filing: ≤ 24 hours from suspicious determination │
│ │ (MLPPA 2022 requirement) │
│ └── CTR Filing: Same business day │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Case Management Data Model
-- Investigation cases
CREATE TABLE investigation_cases (
case_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
case_number VARCHAR(30) NOT NULL UNIQUE, -- e.g., 'AML-2026-001234'
-- Case origin
trigger_type VARCHAR(30) NOT NULL, -- 'alert', 'manual', 'regulatory'
alert_ids UUID[], -- Linked alerts
-- Customer
customer_id VARCHAR(50) NOT NULL,
customer_name VARCHAR(300),
customer_risk_level VARCHAR(20),
-- Case details
typology VARCHAR(100), -- AML typology classification
severity VARCHAR(20) NOT NULL, -- 'low', 'medium', 'high', 'critical'
total_suspicious_amount NUMERIC(18,2),
transaction_count INTEGER,
date_range_start DATE,
date_range_end DATE,
narrative TEXT, -- Investigation narrative
-- Assignment
assigned_to VARCHAR(100), -- Analyst user ID
assigned_team VARCHAR(50),
assigned_at TIMESTAMP WITH TIME ZONE,
-- Status
status VARCHAR(30) NOT NULL DEFAULT 'new',
-- 'new', 'triaging', 'investigating', 'escalated',
-- 'pending_str', 'str_filed', 'closed_no_action',
-- 'closed_false_positive', 'closed_resolved'
-- Outcome
outcome VARCHAR(50),
outcome_reason TEXT,
str_filed BOOLEAN DEFAULT FALSE,
str_reference VARCHAR(100),
str_filed_at TIMESTAMP WITH TIME ZONE,
-- SLA tracking
sla_triage_deadline TIMESTAMP WITH TIME ZONE,
sla_investigation_deadline TIMESTAMP WITH TIME ZONE,
sla_breached BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
closed_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX idx_cases_status ON investigation_cases(status);
CREATE INDEX idx_cases_assigned ON investigation_cases(assigned_to, status);
CREATE INDEX idx_cases_customer ON investigation_cases(customer_id);
CREATE INDEX idx_cases_sla ON investigation_cases(sla_investigation_deadline)
WHERE status NOT IN ('closed_no_action', 'closed_false_positive', 'closed_resolved');
-- Case evidence attachments
CREATE TABLE case_evidence (
id BIGSERIAL PRIMARY KEY,
case_id UUID REFERENCES investigation_cases(case_id),
evidence_type VARCHAR(50) NOT NULL,
-- 'transaction_detail', 'screening_hit', 'customer_document',
-- 'analyst_note', 'external_intelligence', 'regulatory_request'
title VARCHAR(300) NOT NULL,
description TEXT,
content JSONB, -- Structured evidence data
file_path VARCHAR(500), -- For document attachments
added_by VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Case activity log (audit trail)
CREATE TABLE case_activities (
id BIGSERIAL PRIMARY KEY,
case_id UUID REFERENCES investigation_cases(case_id),
activity_type VARCHAR(50) NOT NULL,
-- 'created', 'assigned', 'note_added', 'status_changed',
-- 'escalated', 'str_filed', 'closed', 'reopened'
description TEXT NOT NULL,
actor VARCHAR(100) NOT NULL,
previous_value JSONB,
new_value JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_ca_case ON case_activities(case_id, created_at);Automated Regulatory Reporting
CBN Standard 7 requires automated generation and submission of regulatory reports — primarily Suspicious Transaction Reports (STRs) to the Nigerian Financial Intelligence Unit (NFIU) and Currency Transaction Reports (CTRs) as mandated by the MLPPA 2022.
STR Generation Pipeline
Under the MLPPA 2022, financial institutions must file an STR within 24 hours of determining that a transaction is suspicious. The scrutiny period (from detection to determination) must not exceed 72 hours. This means the clock starts ticking the moment an alert is generated:
class STRGenerationPipeline:
"""
Automated pipeline for generating and filing Suspicious Transaction
Reports with the Nigerian Financial Intelligence Unit (NFIU).
MLPPA 2022 timeline:
- Alert generated → Scrutiny period ≤ 72 hours
- Suspicion confirmed → STR filed ≤ 24 hours
- Total: ≤ 96 hours from alert to filing
"""
async def generate_str(
self,
case: InvestigationCase,
analyst_determination: str,
) -> STRReport:
"""
Generate an STR from a completed investigation case.
Automatically enriches with required NFIU fields.
"""
# Gather all case data
customer = await self.db.get_customer_profile(case.customer_id)
transactions = await self.db.get_case_transactions(case.case_id)
evidence = await self.db.get_case_evidence(case.case_id)
screening_hits = await self.db.get_case_screening_hits(case.case_id)
str_report = STRReport(
# Filing institution details
institution_name=self.config.institution_name,
institution_code=self.config.cbn_institution_code,
filing_officer=analyst_determination.analyst_id,
filing_date=datetime.utcnow(),
# Subject information
subject_name=customer.full_name,
subject_bvn=customer.bvn,
subject_nin=customer.nin,
subject_address=customer.address,
subject_phone=customer.phone,
subject_occupation=customer.occupation,
subject_account_numbers=customer.account_numbers,
subject_account_open_date=customer.account_open_date,
# Transaction details
transactions=[
STRTransaction(
date=t.timestamp,
amount=t.amount,
currency=t.currency,
type=t.transaction_type,
channel=t.channel,
counterparty=t.beneficiary_name,
counterparty_account=t.beneficiary_account,
counterparty_bank=t.beneficiary_institution,
narration=t.narration,
)
for t in transactions
],
# Suspicion details
total_suspicious_amount=case.total_suspicious_amount,
suspicion_indicators=self._extract_indicators(case, evidence),
typology=case.typology,
narrative=self._generate_narrative(case, customer, transactions),
# Screening results
sanctions_hits=[h for h in screening_hits if h.type == 'sanctions'],
pep_hits=[h for h in screening_hits if h.type == 'pep'],
# Internal reference
case_number=case.case_number,
alert_ids=case.alert_ids,
)
# Validate STR completeness before filing
validation = self._validate_str(str_report)
if not validation.is_complete:
raise STRValidationError(
f'STR incomplete: {validation.missing_fields}'
)
# File with NFIU
filing_result = await self._submit_to_nfiu(str_report)
# Record filing in case management and audit log
await self.db.update_case_str_status(
case_id=case.case_id,
str_filed=True,
str_reference=filing_result.reference_number,
str_filed_at=datetime.utcnow(),
)
await self.audit.log({
'action': 'str_filed',
'case_id': str(case.case_id),
'str_reference': filing_result.reference_number,
'total_amount': str(case.total_suspicious_amount),
'filed_by': analyst_determination.analyst_id,
'filed_within_deadline': self._check_deadline_compliance(case),
})
return str_report
def _check_deadline_compliance(self, case: InvestigationCase) -> bool:
"""
Verify that the STR was filed within the MLPPA 2022 deadline:
24 hours from suspicion determination.
"""
determination_time = case.escalated_at or case.updated_at
filing_time = datetime.utcnow()
elapsed = filing_time - determination_time
return elapsed <= timedelta(hours=24)
class CTRAutomationPipeline:
"""
Automated Currency Transaction Report generation.
MLPPA 2022 thresholds:
- Individual: ₦5,000,000
- Corporate: ₦10,000,000
CTRs are generated automatically — no investigation needed.
"""
INDIVIDUAL_THRESHOLD = Decimal('5000000')
CORPORATE_THRESHOLD = Decimal('10000000')
async def process_cash_transaction(
self,
txn: TransactionEvent,
customer: CustomerRiskProfile,
):
"""
Evaluate whether a cash transaction triggers CTR filing.
Also checks for structured transactions that aggregate
above the threshold within a business day.
"""
if not txn.is_cash:
return
threshold = (
self.CORPORATE_THRESHOLD
if customer.customer_type == 'corporate'
else self.INDIVIDUAL_THRESHOLD
)
# Single transaction above threshold
if txn.amount >= threshold:
await self._generate_ctr(txn, customer, trigger='single_transaction')
return
# Aggregated cash transactions today
today_cash_total = await self.db.get_daily_cash_total(
customer_id=txn.originator_customer_id,
date=txn.timestamp.date(),
)
if today_cash_total + txn.amount >= threshold:
# Aggregate exceeds threshold — file CTR for all today's cash txns
today_transactions = await self.db.get_daily_cash_transactions(
customer_id=txn.originator_customer_id,
date=txn.timestamp.date(),
)
await self._generate_ctr(
txn,
customer,
trigger='daily_aggregate',
related_transactions=today_transactions,
)Audit Trails, Governance, and NDPA Compliance
CBN Standard 8 requires tamper-proof audit logs, role-based access controls, multi-factor authentication, and secure data transmission — all while complying with the Nigeria Data Protection Act (NDPA) 2023. This is the governance backbone that makes everything else defensible during a CBN examination.
Tamper-Proof Audit Logging
import hashlib
import json
from datetime import datetime
class ImmutableAuditLog:
"""
Append-only, hash-chained audit log that provides tamper evidence.
Each log entry includes a hash of the previous entry, creating
a blockchain-like chain that makes retroactive modification detectable.
Storage: Elasticsearch (for search) + S3/MinIO (for immutable archive).
"""
def __init__(self, db, archive_store):
self.db = db
self.archive = archive_store
async def log(self, entry: dict) -> str:
"""
Write an audit log entry with hash chain.
Returns the entry hash for verification.
"""
# Get hash of the previous entry
previous_hash = await self._get_latest_hash()
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'previous_hash': previous_hash,
'entry_data': entry,
'entry_type': entry.get('action', 'unknown'),
'actor': entry.get('actor', 'system'),
'ip_address': entry.get('ip_address'),
'session_id': entry.get('session_id'),
}
# Calculate hash of this entry (includes previous hash → chain)
entry_json = json.dumps(audit_entry, sort_keys=True)
entry_hash = hashlib.sha256(entry_json.encode()).hexdigest()
audit_entry['entry_hash'] = entry_hash
# Write to primary store (Elasticsearch — searchable)
await self.db.index_audit_entry(audit_entry)
# Write to immutable archive (S3/MinIO — append-only bucket policy)
await self.archive.write(
key=f"audit/{datetime.utcnow().strftime('%Y/%m/%d')}/{entry_hash}.json",
data=entry_json,
)
return entry_hash
async def verify_chain_integrity(
self,
start_date: datetime,
end_date: datetime,
) -> ChainVerification:
"""
Verify the integrity of the audit chain for a date range.
Used during CBN examinations and internal audits.
"""
entries = await self.db.get_audit_entries(start_date, end_date)
broken_links = []
verified_count = 0
for i, entry in enumerate(entries):
# Verify this entry's hash
expected_data = {
k: v for k, v in entry.items() if k != 'entry_hash'
}
expected_hash = hashlib.sha256(
json.dumps(expected_data, sort_keys=True).encode()
).hexdigest()
if expected_hash != entry['entry_hash']:
broken_links.append({
'position': i,
'entry_hash': entry['entry_hash'],
'expected_hash': expected_hash,
'reason': 'hash_mismatch',
})
continue
# Verify chain link
if i > 0 and entry['previous_hash'] != entries[i-1]['entry_hash']:
broken_links.append({
'position': i,
'reason': 'chain_break',
})
continue
verified_count += 1
return ChainVerification(
total_entries=len(entries),
verified_entries=verified_count,
broken_links=broken_links,
chain_intact=len(broken_links) == 0,
)Data Protection Under NDPA 2023
AML data processing involves sensitive personal data — transaction histories, investigation records, and suspicious activity determinations carry significant privacy implications. The NDPA 2023 applies:
┌─────────────────────────────────────────────────────────────────────────────┐
│ NDPA COMPLIANCE FOR AML DATA │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DATA ELEMENT │ CLASSIFICATION │ RETENTION │
│ ────────────────────────────────────────────────────────────────── │
│ Customer identity (BVN/NIN) │ Sensitive personal│ Account lifetime │
│ Transaction records │ Personal data │ 10 years (CBN) │
│ Risk scores and profiles │ Derived personal │ Account lifetime │
│ Sanctions screening results │ Compliance data │ 10 years │
│ Investigation case files │ Sensitive personal│ 10 years post-close │
│ STR/CTR filings │ Regulatory data │ 10 years post-filing │
│ Alert details │ Derived personal │ 7 years │
│ Audit logs │ System data │ 10 years │
│ │
│ KEY NDPA OBLIGATIONS │
│ ├── Lawful basis: Legal obligation (MLPPA 2022) — no consent needed │
│ │ for core AML processing, but consent needed for enhanced profiling │
│ ├── Purpose limitation: AML/CFT/CPF compliance only — cannot repurpose │
│ │ investigation data for marketing or credit scoring │
│ ├── Data minimization: Collect only what MLPPA and CBN standards require │
│ ├── Security: Encryption at rest (AES-256) and in transit (TLS 1.3) │
│ ├── Access controls: RBAC with MFA, least-privilege principle │
│ ├── Data breach: Notify NDPC within 72 hours of breach discovery │
│ ├── Cross-border transfers: AML data shared with foreign FIUs must │
│ │ comply with NDPA cross-border transfer provisions │
│ └── DPIA: Required for large-scale processing of financial crime data │
│ │
│ LEGAL BASIS HIERARCHY │
│ ├── MLPPA 2022 → Mandates AML processing (overrides consent requirement) │
│ ├── CBN Regulations → Specifies retention periods and technical controls │
│ ├── NDPA 2023 → Sets data protection standards that AML processing │
│ │ must still satisfy (security, access control, breach notification) │
│ └── NFIU Guidelines → STR/CTR filing requirements and formats │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Role-Based Access Control Design
-- RBAC schema for AML platform
CREATE TABLE aml_roles (
role_id VARCHAR(50) PRIMARY KEY,
role_name VARCHAR(100) NOT NULL,
description TEXT,
max_data_classification VARCHAR(20), -- 'public', 'internal', 'confidential', 'restricted'
can_view_nin BOOLEAN DEFAULT FALSE,
can_export_data BOOLEAN DEFAULT FALSE,
can_file_str BOOLEAN DEFAULT FALSE,
can_override_risk BOOLEAN DEFAULT FALSE,
can_dismiss_sanctions_hit BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Standard AML platform roles
INSERT INTO aml_roles (role_id, role_name, description, max_data_classification,
can_view_nin, can_export_data, can_file_str, can_override_risk,
can_dismiss_sanctions_hit) VALUES
('analyst_l1', 'L1 Analyst', 'Alert triage and initial investigation',
'confidential', FALSE, FALSE, FALSE, FALSE, FALSE),
('analyst_l2', 'L2 Analyst', 'Full investigation and STR preparation',
'confidential', TRUE, FALSE, FALSE, FALSE, FALSE),
('analyst_l3', 'Senior Analyst', 'Complex cases, STR filing, risk override',
'restricted', TRUE, TRUE, TRUE, TRUE, FALSE),
('mlro', 'Money Laundering Reporting Officer', 'Final STR approval and filing',
'restricted', TRUE, TRUE, TRUE, TRUE, TRUE),
('compliance_head', 'Head of Compliance', 'Full access, policy management',
'restricted', TRUE, TRUE, TRUE, TRUE, TRUE),
('auditor', 'Internal Auditor', 'Read-only access to all data and audit logs',
'restricted', TRUE, TRUE, FALSE, FALSE, FALSE),
('system_admin', 'System Administrator', 'Platform configuration, no case access',
'internal', FALSE, FALSE, FALSE, FALSE, FALSE),
('cbn_examiner', 'CBN Examiner', 'Temporary read access during examination',
'restricted', TRUE, TRUE, FALSE, FALSE, FALSE);Implementation Roadmap and Cost Estimates
The 90-day implementation roadmap submission (due June 10, 2026) is the first compliance gate. It tells CBN that your institution has a credible plan. Getting this right matters — institutions that miss it or submit vague plans signal to regulators that they are not taking the directive seriously.
Phased Implementation
┌─────────────────────────────────────────────────────────────────────────────┐
│ IMPLEMENTATION ROADMAP (18-MONTH PLAN FOR BANKS) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PHASE 1: FOUNDATION (Months 1-6) │
│ ├── Core infrastructure: Kafka cluster, PostgreSQL, Elasticsearch │
│ ├── Data ingestion: Connect to core banking, card systems, e-channels │
│ ├── Sanctions screening: Real-time list ingestion and name matching │
│ ├── CTR automation: Threshold monitoring and automated filing │
│ ├── Basic transaction monitoring: Top 10 rule-based scenarios │
│ ├── Audit logging: Immutable log infrastructure │
│ ├── RBAC + MFA: Access control framework │
│ └── DELIVERABLE: Basic screening and monitoring operational │
│ │
│ PHASE 2: INTELLIGENCE (Months 7-12) │
│ ├── ML model development: Anomaly detection, behavioral profiling │
│ ├── Case management system: Full investigation workflow │
│ ├── STR automation: End-to-end filing pipeline │
│ ├── PEP screening and adverse media monitoring │
│ ├── Enhanced typology scenarios (Nigeria-specific patterns) │
│ ├── Customer risk scoring engine: Dynamic, multi-factor │
│ ├── Model governance framework: Validation, bias testing │
│ └── DELIVERABLE: Full monitoring and investigation capability │
│ │
│ PHASE 3: OPTIMIZATION (Months 13-18) │
│ ├── ML model tuning: Reduce false positives, improve detection │
│ ├── Advanced analytics: Network analysis, entity resolution │
│ ├── Regulatory reporting optimization: Automated CBN returns │
│ ├── Vendor management framework │
│ ├── First independent model validation │
│ ├── Staff training and change management │
│ ├── CBN examination readiness drills │
│ └── DELIVERABLE: Fully compliant, optimized AML platform │
│ │
│ KEY MILESTONES │
│ ├── Month 3: Implementation roadmap submitted to CBN ✓ │
│ ├── Month 6: Basic screening and CTR automation live │
│ ├── Month 9: Case management and STR pipeline live │
│ ├── Month 12: ML models in production with governance framework │
│ ├── Month 15: First independent model validation completed │
│ └── Month 18: Full compliance achieved │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Technology Stack
┌─────────────────────────────────────────────────────────────────────────────┐
│ RECOMMENDED TECHNOLOGY STACK │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ COMPONENT │ TECHNOLOGY │ PURPOSE │
│ ─────────────────────────────────────────────────────────────────── │
│ Event Streaming │ Apache Kafka │ Transaction ingestion │
│ Stream Processing │ Apache Flink │ Real-time monitoring │
│ Primary Database │ PostgreSQL 16 │ Cases, customers, rules │
│ Search & Analytics │ Elasticsearch 8 │ Transaction search, │
│ │ │ audit log search │
│ Cache │ Redis 7 │ Sanctions cache, │
│ │ │ session management │
│ Object Storage │ MinIO (S3-compatible) │ Evidence, archives, │
│ │ │ immutable audit logs │
│ ML Platform │ MLflow + Python │ Model training, │
│ │ (scikit-learn, XGBoost) │ validation, serving │
│ API Layer │ FastAPI (Python) │ Microservices APIs │
│ Frontend │ Next.js / React │ Analyst dashboards │
│ Workflow Engine │ Temporal │ Case management, │
│ │ │ SLA orchestration │
│ Monitoring │ Prometheus + Grafana │ System health, │
│ │ │ alert volume metrics │
│ Log Aggregation │ Loki │ Application logs │
│ Secrets Management │ HashiCorp Vault │ API keys, encryption │
│ Container Orchestration│ Kubernetes │ Service deployment │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Infrastructure Sizing and Cost Estimates
┌─────────────────────────────────────────────────────────────────────────────┐
│ INFRASTRUCTURE SIZING BY INSTITUTION TIER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ COMPONENT │ Microfinance / │ Mid-Tier Bank │ Tier-1 DMB │
│ │ Fintech │ / Large PSP │ (Zenith, GTBank, │
│ │ (1M txn/month) │ (50M txn/month)│ Access, UBA etc.) │
│ │ │ │ (500M+ txn/month) │
│ ──────────────────────────────────────────────────────────────── │
│ Kafka │ 3 brokers (4GB) │ 5 brokers │ 12+ brokers │
│ │ │ (16GB each) │ (32GB each) │
│ Flink │ 2 task managers │ 6 task managers│ 20+ task managers │
│ PostgreSQL │ 4 vCPU, 16GB │ 16 vCPU, 64GB │ 64 vCPU, 256GB │
│ │ 500GB SSD │ 2TB SSD │ 10TB SSD + replicas │
│ Elasticsearch │ 3 nodes (8GB) │ 6 nodes (32GB) │ 15+ nodes (64GB) │
│ Redis │ 4GB │ 16GB cluster │ 64GB cluster │
│ ML Serving │ 2 vCPU (CPU) │ 4 GPU instances│ 8+ GPU instances │
│ │
│ MONTHLY CLOUD COST ESTIMATES │
│ Microfinance / Fintech: $2,000 – $5,000/month │
│ Mid-Tier Bank / Large PSP: $8,000 – $20,000/month │
│ Tier-1 Deposit Money Bank: $40,000 – $100,000/month │
│ │
│ DEVELOPMENT COST ESTIMATES │
│ Phase 1 (Foundation): $150,000 – $300,000 │
│ Phase 2 (Intelligence): $200,000 – $400,000 │
│ Phase 3 (Optimization): $100,000 – $200,000 │
│ Total (18-month build): $450,000 – $900,000 │
│ │
│ ANNUAL OPERATING COSTS (post-build) │
│ Cloud infrastructure: $24,000 – $1,200,000/year (by tier) │
│ ML model retraining: $20,000 – $50,000/year │
│ Sanctions list feeds: $10,000 – $50,000/year │
│ Independent model validation: $30,000 – $80,000/year │
│ Staff (analysts, engineers): Institution-dependent │
│ │
│ BUILD vs BUY vs HYBRID │
│ ├── Build: Full control, highest cost, 12-18 months │
│ │ Best for: Tier-1 banks with unique requirements │
│ ├── Buy (NICE Actimize, SAS, Oracle FCCM): Fastest, vendor lock-in │
│ │ Best for: Mid-tier banks wanting proven platforms │
│ ├── Buy (Regional): Emerging Nigerian AML-tech startups │
│ │ Best for: Fintechs wanting cost-effective, Nigeria-aware solutions │
│ └── Hybrid: Buy core platform, build Nigeria-specific customizations │
│ Best for: Most institutions (balances speed, cost, and fit) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘The 90-Day Roadmap Deliverable
The implementation roadmap due to CBN by June 10, 2026 should include, at minimum:
- Current state assessment: Honest evaluation of existing AML capabilities, gaps against the 10 baseline standards, and identified risks
- Target architecture: System design showing how each standard will be met, including technology choices and integration points
- Implementation timeline: Phased plan with milestones, mapped to the 18-month (or 24-month) compliance deadline
- Resource plan: Team composition, vendor selection status, budget allocation
- Risk register: Key implementation risks (NIMC integration challenges, data quality issues, staff training gaps) and mitigations
- Governance structure: Who owns AML technology? Reporting lines between compliance, IT, and the board
- Quick wins: What can be deployed in the first 90 days to demonstrate momentum (even before full platform build)
Conclusion: From ₦15 Billion in Fines to Automated Compliance
CBN Circular BSD/DIR/PUB/LAB/019/002 marks a decisive moment for Nigerian financial services. The message is unmistakable: the era of manual AML compliance — spreadsheet-based transaction reviews, delayed STR filings, reactive sanctions screening — is over. What replaces it is a data engineering challenge of the first order.
The 10 baseline standards describe a modern, integrated data platform:
- Real-time streaming ingests billions of transactions from every channel
- ML models detect patterns that rule-based systems miss — and CBN demands these models be validated, explainable, and fair
- Automated pipelines file STRs within the 24-hour MLPPA deadline and generate CTRs without human intervention
- Tamper-proof audit trails provide the evidence base that withstands CBN examination
- Identity verification through BVN and NIN integration anchors every customer interaction to a verified identity
The institutions that build these platforms well will do more than avoid fines. They will gain operational intelligence — real-time visibility into transaction flows, customer behavior, and risk exposure — that transforms compliance from a cost center into a strategic capability. The same infrastructure that detects money laundering also powers fraud detection, credit risk assessment, and customer insight.
The June 2026 roadmap deadline is 90 days away. The 18-month compliance clock has already started. For institutions still operating with manual processes, the question is not whether to automate — the CBN has answered that. The question is how quickly you can build a platform that meets the standard.
This article analyzes the data engineering requirements of CBN Circular BSD/DIR/PUB/LAB/019/002 — the Baseline Standards for Automated AML Solutions. The architectures and code examples presented are reference designs based on established financial crime technology patterns. Financial institutions should engage qualified data engineering firms, legal counsel, and compliance experts for implementation. Gemut Analytics provides AML platform architecture, real-time data pipeline engineering, ML model development, and regulatory compliance consulting for financial institutions navigating these requirements.
Key Takeaways
- ✓CBN Circular BSD/DIR/PUB/LAB/019/002 establishes 10 mandatory baseline standards for automated AML systems — from customer identification and sanctions screening to AI/ML model governance and vendor management
- ✓Real-time transaction monitoring across cards, e-channels, deposits, and lending requires event-driven streaming architecture (Kafka/Flink) processing billions of transactions against configurable rule engines and ML models
- ✓Sanctions screening must cover CBN, OFAC, UN, and EU lists with fuzzy name matching that handles Nigerian naming patterns — and must automatically block confirmed matches in real time
- ✓AI/ML models for anomaly detection require annual independent validation covering accuracy, performance drift, fairness audits, bias testing, and explainability — investigators must understand why an alert fired
- ✓Automated STR filing within MLPPA's 24-hour deadline and CTR generation at ₦5M/₦10M thresholds transforms what was a manual scramble into a pipeline problem
- ✓The 90-day implementation roadmap submission (due June 2026) is the first compliance gate — institutions that miss it signal to CBN that they are not taking the directive seriously



