Navigating Data Compliance in Nigeria: Building NDPR, CBN, and NCC-Ready Data Infrastructure
A practical guide to building compliant data infrastructure in Nigeria, covering NDPR data protection requirements, CBN financial data guidelines, NCC telecom regulations, and automated compliance pipelines with audit trails.
Nigeria's regulatory landscape for data is rapidly evolving. The Nigeria Data Protection Regulation (NDPR) and its successor NDPA, CBN guidelines for financial institutions, NCC requirements for telecom operators, and FIRS tax reporting mandates create a complex compliance environment. Most Nigerian businesses still rely on manual processes for regulatory reporting — spreadsheets, email chains, and quarterly fire drills before submission deadlines. This guide demonstrates how to engineer compliance directly into your data infrastructure, transforming regulatory requirements from a burden into an automated, auditable, and continuously monitored capability.
- Understanding of data pipeline and data warehouse concepts
- Familiarity with data governance principles
- Basic knowledge of SQL and Python

Introduction
Nigeria is Africa's largest economy, its most populous nation, and an increasingly digital-first market. With over 200 million citizens, a booming fintech ecosystem, and rapid telecom expansion, Nigerian organisations generate and process vast quantities of personal, financial, and operational data every day. Regulatory bodies have responded with a growing web of data protection and reporting requirements — but most organisations are still catching up.
The reality on the ground is sobering. At many Nigerian banks, quarterly CBN returns are assembled manually from spreadsheets exported across departments. Fintech startups discover NDPR consent requirements only after a NITDA audit notice arrives. Telecom operators scramble to reconcile NCC subscriber data reports against billing systems that were never designed with regulatory output in mind. The result is late submissions, inaccurate reports, regulatory penalties, and — worst of all — a false sense of compliance that crumbles under scrutiny.
This guide takes a different approach. Instead of treating compliance as a checkbox exercise bolted onto existing systems, we demonstrate how to engineer compliance into data infrastructure from the ground up. Every pipeline, every data model, every transformation becomes a compliance asset — automated, auditable, and continuously monitored.
┌─────────────────────────────────────────────────────────────────────────────┐
│ NIGERIAN REGULATORY DATA LANDSCAPE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ NITDA / NDPC CBN NCC FIRS │
│ ─────────────── ─────────── ────────── ────────── │
│ NDPR / NDPA Banking Regs Telecom Regs Tax Reporting │
│ Data Protection AML/CFT Subscriber VAT / CIT │
│ Consent Mgmt KYC / CDD Data Transfer │
│ Cross-border Transaction Quality of Pricing │
│ Transfers Reporting Service Withholding │
│ Breach Notify Credit Bureau Type Approval Tax │
│ Reporting │
│ │
│ SEC Nigeria │
│ ─────────── │
│ Capital Markets │
│ Reporting │
│ Investor Data │
│ Protection │
│ │
├─────────────────────────────────────────────────────────────────────────────┤
│ ALL REGULATORS: Increasing focus on audit trails, data lineage, │
│ automated reporting, and demonstrable compliance (not just policies) │
└─────────────────────────────────────────────────────────────────────────────┘
The stakes are significant. NDPR violations can result in fines of up to 2% of annual gross revenue or NGN 10 million (whichever is greater). CBN penalties for late or inaccurate returns can include monetary fines, operational restrictions, and even licence revocation in extreme cases. NCC enforcement actions can include fines of up to NGN 5 million per violation. This is not a theoretical risk — NITDA has actively investigated and sanctioned organisations since NDPR's introduction in 2019.
Nigeria's Regulatory Data Landscape
Before building compliant infrastructure, you must understand exactly what each regulator requires. Nigerian data regulations are not monolithic — they come from multiple bodies, apply to different sectors, and impose distinct obligations. Let us examine each in detail.
NDPR and NDPA: The Data Protection Foundation
The Nigeria Data Protection Regulation (NDPR), issued by NITDA in January 2019, was Nigeria's first comprehensive data protection framework. In June 2023, President Bola Tinubu signed the Nigeria Data Protection Act (NDPA) into law, establishing the Nigeria Data Protection Commission (NDPC) as an independent regulatory body. The NDPA supersedes the NDPR but largely builds upon its principles.
Key requirements for data engineers:
| Requirement | NDPR/NDPA Reference | Technical Implication |
|---|---|---|
| Lawful basis for processing | NDPA Part III, Sec 25-30 | Consent management system with audit trail |
| Purpose limitation | NDPA Part III, Sec 26 | Data flow tagging and enforcement at pipeline level |
| Data minimisation | NDPA Part III, Sec 27 | Column-level access control, selective ingestion |
| Storage limitation | NDPA Part III, Sec 28 | Automated retention and deletion policies |
| Right to erasure | NDPA Part IV, Sec 37 | Soft-delete propagation across all downstream systems |
| Right to data portability | NDPA Part IV, Sec 38 | Standardised export formats, API endpoints |
| Data breach notification | NDPA Part V, Sec 44 | 72-hour detection and notification pipeline |
| Cross-border transfer | NDPA Part VI, Sec 47-49 | Data residency enforcement, transfer impact assessments |
| Data Protection Impact Assessment | NDPA Part III, Sec 31 | Automated DPIA workflows for new data processing |
| Record of processing activities | NDPA Part III, Sec 32 | Metadata catalog with processing purpose annotations |
NDPR Article 2.1 mandates that any organisation that processes personal data of Nigerian residents must have a Data Protection Compliance Organisation (DPCO) conduct an annual audit of its data processing activities. The NDPA reinforces this with the requirement for a Data Protection Officer (DPO) at organisations processing data above prescribed thresholds.
CBN Guidelines for Financial Institutions
The Central Bank of Nigeria (CBN) imposes extensive data and reporting requirements on banks, microfinance banks, payment service providers, and other financial institutions. Key frameworks include:
- CBN AML/CFT Regulations 2022: Mandate customer due diligence (CDD), enhanced due diligence (EDD), suspicious transaction reporting (STR), and currency transaction reporting (CTR) with strict data retention requirements
- CBN Risk-Based Supervision Framework: Requires granular transaction data for supervisory review, including real-time access to certain categories of transactions
- CBN Guidelines on Electronic Banking 2003 (revised): Data security requirements for electronic channels, including encryption, access control, and audit logging
- CBN Credit Bureau Reporting: Licensed credit bureaus (CRC, First Central, CreditRegistry) require regular data submissions from financial institutions
- CBN Regulation on Open Banking 2023: API-based data sharing with consent management, rate limiting, and comprehensive logging
Reporting obligations:
| Report | Frequency | Submission Window | Key Data Points |
|---|---|---|---|
| Bank Returns (Form M) | Monthly | 15th of following month | Balance sheet, P&L, asset quality |
| Suspicious Transaction Report | Per event | 24-72 hours | Transaction details, parties, suspicion basis |
| Currency Transaction Report | Daily | Next business day | Transactions above NGN 5M (individual) / NGN 10M (corporate) |
| Credit Bureau Returns | Monthly | 30 days after month-end | Loan performance, repayment history |
| Financial Stability Returns | Quarterly | 30 days after quarter-end | Capital adequacy, liquidity ratios |
| Foreign Exchange Returns | Daily | Same day | FX transactions, positions, rates |
NCC Requirements for Telecom Operators
The Nigerian Communications Commission (NCC) regulates telecommunications operators and imposes requirements around:
- SIM Registration Regulations: NIN-SIM linkage data, subscriber identity verification data must be maintained and accessible to the NCC
- Quality of Service (QoS) Reporting: Monthly QoS metrics including call drop rates, network availability, and data throughput
- Type Approval Data: Device and equipment certification data
- Consumer Complaint Data: Quarterly reporting on consumer complaints, resolution rates, and turnaround times
- Interconnect Data: Call detail records for interconnect billing and dispute resolution
NCC's Consumer Code of Practice also requires telecom operators to maintain data privacy standards for subscriber information, which must align with NDPR/NDPA requirements.
FIRS Tax Reporting
The Federal Inland Revenue Service (FIRS) requires:
- VAT Returns: Monthly, due 21 days after the month-end
- Company Income Tax (CIT): Annual returns with supporting schedules
- Transfer Pricing Documentation: For companies with related-party transactions above NGN 300 million
- Withholding Tax Remittance: Monthly, due 21 days after deduction
- FIRS TaxPro Max Integration: Electronic filing with structured data requirements
SEC Nigeria
The Securities and Exchange Commission Nigeria regulates capital market operators with requirements including:
- Quarterly and annual financial returns for broker-dealers, fund managers, and registrars
- Investor data protection aligned with NDPR/NDPA
- Trade reporting for equities, fixed income, and derivatives
- Beneficial ownership reporting under the Companies and Allied Matters Act (CAMA) 2020
Building NDPR-Compliant Data Pipelines
NDPR/NDPA compliance is not a policy document — it is a set of engineering constraints that must be embedded into every data pipeline. Let us examine the key technical components.
Consent Management Architecture
Consent is the most common lawful basis for processing personal data under NDPR. Every record in your data warehouse that originates from personal data must be traceable to a specific, informed, and freely given consent.
┌──────────────────────────────────────────────────────────────────────────┐
│ CONSENT MANAGEMENT ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌────────────────┐ ┌──────────────────┐ │
│ │ User │───>│ Consent │───>│ Consent │ │
│ │ Interface│ │ Collection │ │ Event Store │ │
│ │ (Web/App)│ │ Service │ │ (Immutable Log) │ │
│ └──────────┘ └────────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌───────────────────────┤ │
│ │ │ │
│ v v │
│ ┌───────────────┐ ┌──────────────────┐ │
│ │ Consent │ │ Consent State │ │
│ │ Audit Log │ │ Materialized │ │
│ │ (Append-only)│ │ View │ │
│ └───────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌─────────────────────┤ │
│ │ │ │
│ v v │
│ ┌───────────────┐ ┌──────────────────┐ │
│ │ Pipeline │ │ Data Access │ │
│ │ Gate │ │ Policy Engine │ │
│ │ (Pre-ingest) │ │ (Query-time) │ │
│ └───────────────┘ └──────────────────┘ │
│ │
│ Flow: User grants consent -> Event stored immutably -> Materialised │
│ view updated -> Pipelines check consent before processing -> │
│ Query engine enforces purpose limitation at read time │
└──────────────────────────────────────────────────────────────────────────┘
Data model for consent management:
-- Consent Events: Immutable log of all consent actions
CREATE TABLE consent_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
data_subject_id UUID NOT NULL,
consent_type VARCHAR(50) NOT NULL, -- 'marketing', 'analytics', 'third_party_sharing', etc.
action VARCHAR(20) NOT NULL, -- 'grant', 'revoke', 'modify'
purpose TEXT NOT NULL,
lawful_basis VARCHAR(30) NOT NULL, -- 'consent', 'contract', 'legal_obligation', 'legitimate_interest'
collected_via VARCHAR(50) NOT NULL, -- 'web_form', 'mobile_app', 'api', 'in_person'
consent_text_hash VARCHAR(64) NOT NULL, -- SHA-256 of the exact consent text shown
ip_address INET,
user_agent TEXT,
geo_jurisdiction VARCHAR(10) NOT NULL, -- 'NG', 'EU', 'US', etc.
event_timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ,
metadata JSONB,
-- Immutability: no UPDATE or DELETE allowed via application layer
CONSTRAINT valid_action CHECK (action IN ('grant', 'revoke', 'modify')),
CONSTRAINT valid_basis CHECK (lawful_basis IN (
'consent', 'contract', 'legal_obligation',
'vital_interest', 'public_interest', 'legitimate_interest'
))
);
-- Materialised view: Current consent state per subject
CREATE MATERIALIZED VIEW current_consent_state AS
SELECT DISTINCT ON (data_subject_id, consent_type)
data_subject_id,
consent_type,
action AS current_status,
purpose,
lawful_basis,
event_timestamp AS last_updated,
expires_at,
CASE
WHEN action = 'grant' AND (expires_at IS NULL OR expires_at > NOW())
THEN TRUE
ELSE FALSE
END AS is_active
FROM consent_events
ORDER BY data_subject_id, consent_type, event_timestamp DESC;
-- Index for pipeline gate lookups
CREATE INDEX idx_consent_state_lookup
ON consent_events (data_subject_id, consent_type, event_timestamp DESC);
Pipeline Consent Gate
Every pipeline that processes personal data must check the consent state before proceeding. This is implemented as a pipeline gate — a mandatory step that runs before any transformation or loading.
"""
consent_gate.py — NDPR Consent Enforcement for Data Pipelines
This module provides a pipeline gate that verifies consent status
before allowing personal data to flow through processing stages.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging
import psycopg2
from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
class ConsentStatus(Enum):
GRANTED = "grant"
REVOKED = "revoke"
EXPIRED = "expired"
NOT_FOUND = "not_found"
@dataclass
class ConsentCheckResult:
subject_id: str
consent_type: str
status: ConsentStatus
checked_at: datetime
purpose: Optional[str] = None
expires_at: Optional[datetime] = None
class NDPRConsentGate:
"""
Pipeline gate enforcing NDPR consent requirements.
Checks consent status for each data subject before allowing
their data to proceed through the pipeline. Records all checks
in an audit log for regulatory evidence.
"""
def __init__(self, db_connection_string: str):
self.conn = psycopg2.connect(db_connection_string)
self._ensure_audit_table()
def _ensure_audit_table(self):
"""Create the consent check audit table if it does not exist."""
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS consent_check_audit (
check_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_name VARCHAR(100) NOT NULL,
pipeline_run_id VARCHAR(100) NOT NULL,
data_subject_id UUID NOT NULL,
consent_type VARCHAR(50) NOT NULL,
check_result VARCHAR(20) NOT NULL,
action_taken VARCHAR(30) NOT NULL,
checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
""")
self.conn.commit()
def check_consent(
self,
subject_id: str,
consent_type: str,
pipeline_name: str,
pipeline_run_id: str,
) -> ConsentCheckResult:
"""
Check whether a data subject has active consent for the specified type.
Returns ConsentCheckResult and logs the check to the audit table.
NDPR Article 2.3 requires demonstrable consent — this audit trail
provides that evidence.
"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT action, purpose, expires_at, event_timestamp
FROM consent_events
WHERE data_subject_id = %s
AND consent_type = %s
ORDER BY event_timestamp DESC
LIMIT 1;
""", (subject_id, consent_type))
row = cur.fetchone()
now = datetime.utcnow()
if row is None:
status = ConsentStatus.NOT_FOUND
elif row["action"] == "revoke":
status = ConsentStatus.REVOKED
elif row["expires_at"] and row["expires_at"] < now:
status = ConsentStatus.EXPIRED
else:
status = ConsentStatus.GRANTED
result = ConsentCheckResult(
subject_id=subject_id,
consent_type=consent_type,
status=status,
checked_at=now,
purpose=row["purpose"] if row else None,
expires_at=row["expires_at"] if row else None,
)
# Determine action based on consent status
action = "allow" if status == ConsentStatus.GRANTED else "block"
# Record the check in audit log
cur.execute("""
INSERT INTO consent_check_audit
(pipeline_name, pipeline_run_id, data_subject_id,
consent_type, check_result, action_taken)
VALUES (%s, %s, %s, %s, %s, %s);
""", (
pipeline_name, pipeline_run_id, subject_id,
consent_type, status.value, action,
))
self.conn.commit()
if status != ConsentStatus.GRANTED:
logger.warning(
"Consent check FAILED for subject=%s type=%s status=%s pipeline=%s",
subject_id, consent_type, status.value, pipeline_name,
)
return result
def filter_batch(
self,
subject_ids: list[str],
consent_type: str,
pipeline_name: str,
pipeline_run_id: str,
) -> list[str]:
"""
Filter a batch of subject IDs, returning only those with active consent.
Optimised for pipeline batch processing — performs a single query
rather than N individual checks.
"""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT DISTINCT ON (data_subject_id)
data_subject_id,
action,
expires_at
FROM consent_events
WHERE data_subject_id = ANY(%s)
AND consent_type = %s
ORDER BY data_subject_id, event_timestamp DESC;
""", (subject_ids, consent_type))
rows = {str(r["data_subject_id"]): r for r in cur.fetchall()}
now = datetime.utcnow()
allowed = []
blocked = []
for sid in subject_ids:
row = rows.get(sid)
if (
row
and row["action"] == "grant"
and (not row["expires_at"] or row["expires_at"] > now)
):
allowed.append(sid)
else:
blocked.append(sid)
# Bulk audit log insert
audit_records = [
(pipeline_name, pipeline_run_id, sid, consent_type, "grant", "allow")
for sid in allowed
] + [
(pipeline_name, pipeline_run_id, sid, consent_type, "blocked", "block")
for sid in blocked
]
from psycopg2.extras import execute_values
execute_values(cur, """
INSERT INTO consent_check_audit
(pipeline_name, pipeline_run_id, data_subject_id,
consent_type, check_result, action_taken)
VALUES %s;
""", audit_records)
self.conn.commit()
logger.info(
"Batch consent check: %d allowed, %d blocked for type=%s pipeline=%s",
len(allowed), len(blocked), consent_type, pipeline_name,
)
return allowed
Automated Data Retention and Deletion
NDPR/NDPA requires that personal data not be retained longer than necessary for its stated purpose. Manual deletion processes are error-prone and unauditable. Instead, implement automated retention policies that execute on schedule.
"""
retention_manager.py — Automated NDPR-Compliant Data Retention
Implements scheduled data retention enforcement with full audit trail.
Supports both hard delete and anonymisation strategies depending on
whether aggregate data must be preserved.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class RetentionAction(Enum):
HARD_DELETE = "hard_delete"
ANONYMIZE = "anonymize"
ARCHIVE = "archive"
@dataclass
class RetentionPolicy:
"""Defines a retention policy for a specific data category."""
policy_name: str
table_name: str
data_category: str # 'personal', 'financial', 'health', 'telecom'
retention_days: int
action: RetentionAction
timestamp_column: str # Column used to determine data age
subject_id_column: str # Column identifying the data subject
regulatory_basis: str # e.g., 'NDPR Art 2.1(c)', 'CBN AML Reg 12.3'
anonymize_columns: Optional[list[str]] = None # Columns to anonymise
exempt_conditions: Optional[str] = None # SQL WHERE clause for exemptions
# Define retention policies per regulatory requirement
RETENTION_POLICIES = [
RetentionPolicy(
policy_name="ndpr_marketing_consent",
table_name="marketing_contacts",
data_category="personal",
retention_days=365,
action=RetentionAction.HARD_DELETE,
timestamp_column="last_consent_date",
subject_id_column="contact_id",
regulatory_basis="NDPA Part III, Sec 28 — Storage Limitation",
),
RetentionPolicy(
policy_name="cbn_transaction_records",
table_name="transaction_history",
data_category="financial",
retention_days=2555, # 7 years per CBN AML/CFT Regulations
action=RetentionAction.ARCHIVE,
timestamp_column="transaction_date",
subject_id_column="customer_id",
regulatory_basis="CBN AML/CFT Reg 2022, Sec 12.3",
),
RetentionPolicy(
policy_name="ncc_cdr_records",
table_name="call_detail_records",
data_category="telecom",
retention_days=1825, # 5 years per NCC requirements
action=RetentionAction.ANONYMIZE,
timestamp_column="call_timestamp",
subject_id_column="subscriber_msisdn",
regulatory_basis="NCC Consumer Code of Practice, Sec 8",
anonymize_columns=["subscriber_msisdn", "caller_name", "location_data"],
),
RetentionPolicy(
policy_name="ndpr_analytics_data",
table_name="user_behavior_events",
data_category="personal",
retention_days=180,
action=RetentionAction.ANONYMIZE,
timestamp_column="event_timestamp",
subject_id_column="user_id",
regulatory_basis="NDPA Part III, Sec 28 — Storage Limitation",
anonymize_columns=["user_id", "ip_address", "device_id", "email"],
),
]
class RetentionManager:
"""Execute and audit data retention policies."""
def __init__(self, db_connection):
self.conn = db_connection
def execute_policy(self, policy: RetentionPolicy, dry_run: bool = False) -> dict:
"""
Execute a single retention policy.
Returns a summary of actions taken (or that would be taken in dry_run mode).
All actions are recorded in the retention_audit_log table.
"""
cutoff_date = datetime.utcnow() - timedelta(days=policy.retention_days)
with self.conn.cursor() as cur:
# Count affected records
count_sql = f"""
SELECT COUNT(*) FROM {policy.table_name}
WHERE {policy.timestamp_column} < %s
{f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
"""
cur.execute(count_sql, (cutoff_date,))
affected_count = cur.fetchone()[0]
if dry_run:
logger.info(
"[DRY RUN] Policy '%s': %d records would be affected (action: %s)",
policy.policy_name, affected_count, policy.action.value,
)
return {"policy": policy.policy_name, "affected": affected_count, "executed": False}
if affected_count == 0:
logger.info("Policy '%s': No records to process.", policy.policy_name)
return {"policy": policy.policy_name, "affected": 0, "executed": True}
# Execute the retention action
if policy.action == RetentionAction.HARD_DELETE:
action_sql = f"""
DELETE FROM {policy.table_name}
WHERE {policy.timestamp_column} < %s
{f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
"""
cur.execute(action_sql, (cutoff_date,))
elif policy.action == RetentionAction.ANONYMIZE:
set_clauses = ", ".join(
f"{col} = 'ANONYMIZED'" for col in (policy.anonymize_columns or [])
)
if set_clauses:
action_sql = f"""
UPDATE {policy.table_name}
SET {set_clauses}
WHERE {policy.timestamp_column} < %s
{f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
"""
cur.execute(action_sql, (cutoff_date,))
elif policy.action == RetentionAction.ARCHIVE:
# Move to archive table, then delete from source
archive_table = f"{policy.table_name}_archive"
action_sql = f"""
INSERT INTO {archive_table}
SELECT * FROM {policy.table_name}
WHERE {policy.timestamp_column} < %s
{f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''};
DELETE FROM {policy.table_name}
WHERE {policy.timestamp_column} < %s
{f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''};
"""
cur.execute(action_sql, (cutoff_date, cutoff_date))
# Record in audit log
cur.execute("""
INSERT INTO retention_audit_log
(policy_name, table_name, action, records_affected,
cutoff_date, regulatory_basis, executed_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW());
""", (
policy.policy_name, policy.table_name,
policy.action.value, affected_count,
cutoff_date, policy.regulatory_basis,
))
self.conn.commit()
logger.info(
"Policy '%s': %d records processed (action: %s)",
policy.policy_name, affected_count, policy.action.value,
)
return {"policy": policy.policy_name, "affected": affected_count, "executed": True}
Right to Erasure Implementation
NDPA Section 37 grants data subjects the right to request erasure of their personal data. In a modern data stack with multiple downstream systems, this requires cascading erasure across all systems that hold the subject's data.
┌──────────────────────────────────────────────────────────────────────────┐
│ RIGHT TO ERASURE FLOW │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. REQUEST 2. DISCOVERY 3. EXECUTION 4. VERIFY │
│ ────────── ──────────── ──────────── ────────── │
│ │
│ ┌─────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Subject │───>│ Data Map │───>│ Erasure │─>│ Completion │ │
│ │ Request │ │ Discovery │ │ Orchestrator │ │ Verifier │ │
│ └─────────┘ └──────┬───────┘ └──────┬───────┘ └────────────┘ │
│ │ │ │
│ v v │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Systems: │ │ Actions: │ │
│ │ - PostgreSQL │ │ - DELETE │ │
│ │ - Data Lake │ │ - Anonymise │ │
│ │ - Redis │ │ - Overwrite │ │
│ │ - Elastic │ │ - Purge keys │ │
│ │ - Backups │ │ - Redact │ │
│ │ - 3rd Party │ │ - Notify 3P │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ SLA: 30 days per NDPA (72 hours recommended for automated systems) │
└──────────────────────────────────────────────────────────────────────────┘
"""
erasure_orchestrator.py — NDPA Right to Erasure Orchestration
Coordinates erasure across all systems that hold personal data for a
given data subject. Maintains a complete audit trail of all actions taken.
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Callable
import logging
import uuid
logger = logging.getLogger(__name__)
class ErasureStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
PARTIALLY_COMPLETED = "partially_completed"
FAILED = "failed"
@dataclass
class ErasureTarget:
"""A single system or table from which data must be erased."""
system_name: str
description: str
erasure_function: Callable
is_critical: bool = True # If True, failure blocks overall completion
status: ErasureStatus = ErasureStatus.PENDING
records_affected: int = 0
error_message: str = ""
completed_at: datetime = None
@dataclass
class ErasureRequest:
"""An erasure request from a data subject."""
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
data_subject_id: str = ""
requested_at: datetime = field(default_factory=datetime.utcnow)
reason: str = ""
targets: list[ErasureTarget] = field(default_factory=list)
status: ErasureStatus = ErasureStatus.PENDING
completed_at: datetime = None
class ErasureOrchestrator:
"""
Orchestrates right-to-erasure requests across all registered systems.
Implements the NDPA Section 37 requirement with:
- Comprehensive data discovery across all registered systems
- Parallel erasure execution where possible
- Verification of erasure completion
- Full audit trail for regulatory evidence
"""
def __init__(self, db_connection):
self.conn = db_connection
self.targets_registry: list[ErasureTarget] = []
def register_target(self, target: ErasureTarget):
"""Register a system as containing personal data subject to erasure."""
self.targets_registry.append(target)
logger.info("Registered erasure target: %s", target.system_name)
def execute_erasure(self, request: ErasureRequest) -> ErasureRequest:
"""
Execute an erasure request across all registered targets.
Processes each target, records results, and determines overall status.
Per NDPA, we must complete within 30 days — but automated systems
should target 72 hours or less.
"""
request.status = ErasureStatus.IN_PROGRESS
self._log_request_event(request, "erasure_started")
all_succeeded = True
any_succeeded = False
for target in request.targets:
try:
target.status = ErasureStatus.IN_PROGRESS
result = target.erasure_function(request.data_subject_id)
target.records_affected = result.get("records_affected", 0)
target.status = ErasureStatus.COMPLETED
target.completed_at = datetime.utcnow()
any_succeeded = True
logger.info(
"Erasure completed: system=%s subject=%s records=%d",
target.system_name, request.data_subject_id, target.records_affected,
)
except Exception as e:
target.status = ErasureStatus.FAILED
target.error_message = str(e)
if target.is_critical:
all_succeeded = False
logger.error(
"Erasure FAILED: system=%s subject=%s error=%s",
target.system_name, request.data_subject_id, str(e),
)
self._log_target_result(request, target)
# Determine overall status
if all_succeeded:
request.status = ErasureStatus.COMPLETED
elif any_succeeded:
request.status = ErasureStatus.PARTIALLY_COMPLETED
else:
request.status = ErasureStatus.FAILED
request.completed_at = datetime.utcnow()
self._log_request_event(request, "erasure_completed")
return request
def _log_request_event(self, request: ErasureRequest, event_type: str):
"""Log erasure request events to audit table."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO erasure_audit_log
(request_id, data_subject_id, event_type, status, event_timestamp)
VALUES (%s, %s, %s, %s, NOW());
""", (request.request_id, request.data_subject_id, event_type, request.status.value))
self.conn.commit()
def _log_target_result(self, request: ErasureRequest, target: ErasureTarget):
"""Log individual target erasure results."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO erasure_target_audit
(request_id, system_name, status, records_affected,
error_message, completed_at)
VALUES (%s, %s, %s, %s, %s, %s);
""", (
request.request_id, target.system_name, target.status.value,
target.records_affected, target.error_message, target.completed_at,
))
self.conn.commit()
Data Protection Impact Assessment Automation
NDPA Section 31 requires a Data Protection Impact Assessment (DPIA) for processing activities that are "likely to result in a high risk to the rights and freedoms of data subjects." Rather than treating this as a one-off paper exercise, implement automated DPIA workflows that trigger when new data processing activities are registered.
# dpia_policy.yaml — Automated DPIA Trigger Configuration
#
# Defines conditions under which a DPIA is automatically required.
# Based on NDPA Part III, Sec 31 and NDPC Implementation Framework.
dpia_triggers:
automatic_required:
- condition: "data_category IN ('health', 'biometric', 'genetic')"
reason: "Processing of special category data"
ndpa_reference: "Part III, Sec 30(1)(a)"
- condition: "processing_scale == 'large' AND data_category == 'personal'"
reason: "Large-scale processing of personal data"
ndpa_reference: "Part III, Sec 30(1)(b)"
- condition: "involves_profiling == true"
reason: "Systematic profiling with legal or significant effects"
ndpa_reference: "Part III, Sec 30(1)(c)"
- condition: "cross_border_transfer == true AND destination NOT IN approved_jurisdictions"
reason: "Transfer to jurisdiction without adequate protection"
ndpa_reference: "Part VI, Sec 47"
- condition: "new_technology == true AND data_category == 'personal'"
reason: "Use of new technology for personal data processing"
ndpa_reference: "Part III, Sec 30(1)(d)"
risk_scoring:
high_risk_threshold: 12
factors:
- name: "data_sensitivity"
weight: 3
scores:
public: 1
personal: 2
sensitive: 4
special_category: 5
- name: "processing_scale"
weight: 2
scores:
individual: 1
small: 2
medium: 3
large: 4
national: 5
- name: "data_subject_vulnerability"
weight: 2
scores:
general_public: 1
employees: 2
customers: 3
children: 5
patients: 5
approved_jurisdictions:
- "NG" # Nigeria
- "GH" # Ghana (adequate protection)
- "ZA" # South Africa (POPIA)
- "EU" # European Union (GDPR — mutual adequacy)
- "GB" # United Kingdom (UK GDPR)
- "CA" # Canada (PIPEDA)
# Note: USA does not have federal adequacy — requires additional safeguards
CBN Data Requirements for Financial Institutions
Financial institutions in Nigeria face the most demanding regulatory data requirements. The CBN expects not only periodic returns but increasingly real-time or near-real-time access to transaction data. Let us build the infrastructure to meet these requirements.
Transaction Reporting Pipeline
┌──────────────────────────────────────────────────────────────────────────┐
│ CBN TRANSACTION REPORTING PIPELINE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ Core │──>│ CDC / Event │──>│ Stream │──>│ Compliance │ │
│ │ Banking │ │ Stream │ │ Processing │ │ Data Store │ │
│ │ System │ │ (Debezium) │ │ (Flink/Spark)│ │ (Warehouse) │ │
│ └──────────┘ └─────────────┘ └──────┬───────┘ └──────┬──────┘ │
│ │ │ │
│ ┌─────────────────────┘ │ │
│ │ │ │
│ v v │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Real-time │ │ Scheduled │ │
│ │ Alerts │ │ Returns │ │
│ │ ───────── │ │ ────────── │ │
│ │ STR Triggers │ │ Monthly Form │ │
│ │ CTR Triggers │ │ Credit Bureau│ │
│ │ AML Patterns │ │ FX Returns │ │
│ │ Sanctions │ │ Fin Stability│ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ v v │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Compliance │ │ CBN Portal │ │
│ │ Officer │ │ Submission │ │
│ │ Dashboard │ │ (Automated) │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ All flows instrumented with: data lineage, audit trail, checksums │
└──────────────────────────────────────────────────────────────────────────┘
AML/KYC Data Flow Implementation
CBN's AML/CFT Regulations 2022 require financial institutions to maintain comprehensive KYC records and flag suspicious transactions. Here is a data model and detection pipeline:
-- KYC/CDD data model aligned with CBN AML/CFT Regulations 2022
CREATE TABLE kyc_records (
customer_id UUID PRIMARY KEY,
kyc_tier VARCHAR(10) NOT NULL, -- 'tier1', 'tier2', 'tier3' per CBN tiered KYC
bvn VARCHAR(11), -- Bank Verification Number
nin VARCHAR(11), -- National Identification Number
full_name VARCHAR(200) NOT NULL,
date_of_birth DATE,
nationality VARCHAR(3),
address TEXT,
phone_number VARCHAR(20),
email VARCHAR(100),
id_document_type VARCHAR(30), -- 'national_id', 'passport', 'drivers_licence', 'voters_card'
id_document_number VARCHAR(50),
id_verified BOOLEAN DEFAULT FALSE,
id_verified_at TIMESTAMPTZ,
risk_rating VARCHAR(10) NOT NULL, -- 'low', 'medium', 'high'
pep_status BOOLEAN DEFAULT FALSE, -- Politically Exposed Person
sanctions_checked BOOLEAN DEFAULT FALSE,
sanctions_checked_at TIMESTAMPTZ,
cdd_completed_at TIMESTAMPTZ,
edd_required BOOLEAN DEFAULT FALSE, -- Enhanced Due Diligence
edd_completed_at TIMESTAMPTZ,
next_review_date DATE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Transaction monitoring for STR/CTR
CREATE TABLE transaction_monitoring (
txn_id UUID PRIMARY KEY,
customer_id UUID NOT NULL REFERENCES kyc_records(customer_id),
txn_date TIMESTAMPTZ NOT NULL,
txn_type VARCHAR(30) NOT NULL, -- 'transfer', 'deposit', 'withdrawal', 'fx_purchase'
txn_amount NUMERIC(18,2) NOT NULL,
txn_currency VARCHAR(3) NOT NULL DEFAULT 'NGN',
counterparty_name VARCHAR(200),
counterparty_acct VARCHAR(30),
counterparty_bank VARCHAR(100),
channel VARCHAR(20), -- 'branch', 'atm', 'pos', 'mobile', 'internet'
narrative TEXT,
risk_score NUMERIC(5,2), -- Calculated by ML model
alert_generated BOOLEAN DEFAULT FALSE,
alert_type VARCHAR(30), -- 'ctr', 'str', 'pattern', 'sanctions'
reviewed_by VARCHAR(100),
review_outcome VARCHAR(30), -- 'cleared', 'escalated', 'filed_str'
reviewed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Currency Transaction Report (CTR) — auto-generated for transactions above threshold
CREATE VIEW ctr_candidates AS
SELECT
tm.txn_id,
tm.customer_id,
kr.full_name,
kr.bvn,
tm.txn_date,
tm.txn_type,
tm.txn_amount,
tm.txn_currency,
tm.channel,
tm.narrative,
CASE
WHEN kr.kyc_tier = 'tier1' AND tm.txn_amount > 300000 THEN 'OVER_TIER1_LIMIT'
WHEN kr.kyc_tier = 'tier2' AND tm.txn_amount > 500000 THEN 'OVER_TIER2_LIMIT'
WHEN tm.txn_amount > 5000000 THEN 'INDIVIDUAL_CTR_THRESHOLD'
WHEN tm.txn_amount > 10000000 THEN 'CORPORATE_CTR_THRESHOLD'
END AS ctr_reason
FROM transaction_monitoring tm
JOIN kyc_records kr ON tm.customer_id = kr.customer_id
WHERE tm.txn_amount > 300000 -- Lowest relevant threshold
AND tm.txn_date >= CURRENT_DATE - INTERVAL '1 day';
-- Suspicious pattern detection: structuring (smurfing)
CREATE VIEW structuring_alerts AS
SELECT
customer_id,
txn_date::DATE AS txn_day,
COUNT(*) AS txn_count,
SUM(txn_amount) AS total_amount,
AVG(txn_amount) AS avg_amount,
MAX(txn_amount) AS max_amount,
MIN(txn_amount) AS min_amount,
array_agg(DISTINCT channel) AS channels_used
FROM transaction_monitoring
WHERE txn_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY customer_id, txn_date::DATE
HAVING
-- Multiple transactions just below CTR threshold
(COUNT(*) >= 3 AND MAX(txn_amount) < 5000000 AND SUM(txn_amount) > 5000000)
OR
-- Rapid-fire transactions in short window
(COUNT(*) >= 5 AND SUM(txn_amount) > 2000000)
ORDER BY total_amount DESC;
Automated CBN Returns Generation
"""
cbn_returns.py — Automated CBN Regulatory Returns Generation
Generates, validates, and prepares CBN returns for submission.
Includes reconciliation checks to ensure data accuracy before filing.
"""
from dataclasses import dataclass
from datetime import datetime, date
from typing import Optional
import logging
import hashlib
logger = logging.getLogger(__name__)
@dataclass
class CBNReturn:
"""Represents a single CBN regulatory return."""
return_type: str # 'monthly_form_m', 'credit_bureau', 'fx_returns', etc.
reporting_period: str # '2024-04', '2024-Q1', etc.
institution_code: str # CBN institution code
generated_at: datetime = None
record_count: int = 0
checksum: str = ""
validation_passed: bool = False
validation_errors: list = None
file_path: str = ""
class CBNReturnsGenerator:
"""
Generates CBN regulatory returns from the compliance data warehouse.
Each return type has:
- A SQL query to extract the required data
- Validation rules matching CBN format specifications
- Reconciliation checks against source systems
- Checksum generation for integrity verification
"""
def __init__(self, warehouse_connection, source_connection):
self.warehouse = warehouse_connection
self.source = source_connection
def generate_monthly_returns(self, reporting_month: str, institution_code: str) -> CBNReturn:
"""
Generate CBN Monthly Returns (Form M).
Extracts balance sheet, P&L, and asset quality data from the warehouse,
validates against CBN format specifications, and reconciles with source.
"""
cbr = CBNReturn(
return_type="monthly_form_m",
reporting_period=reporting_month,
institution_code=institution_code,
generated_at=datetime.utcnow(),
)
with self.warehouse.cursor() as cur:
# Extract balance sheet data
cur.execute("""
SELECT
account_category,
account_subcategory,
cbn_line_item_code,
SUM(balance_ngn) AS balance,
COUNT(DISTINCT account_id) AS account_count
FROM regulatory_balance_sheet
WHERE reporting_month = %s
AND institution_code = %s
GROUP BY account_category, account_subcategory, cbn_line_item_code
ORDER BY cbn_line_item_code;
""", (reporting_month, institution_code))
rows = cur.fetchall()
cbr.record_count = len(rows)
# Validate
cbr.validation_errors = self._validate_monthly_returns(rows, reporting_month)
cbr.validation_passed = len(cbr.validation_errors) == 0
# Reconciliation check
reconciliation = self._reconcile_with_source(reporting_month, institution_code)
if not reconciliation["balanced"]:
cbr.validation_errors.append(
f"Reconciliation variance: {reconciliation['variance_ngn']:,.2f} NGN"
)
cbr.validation_passed = False
# Generate checksum
content = str(rows).encode("utf-8")
cbr.checksum = hashlib.sha256(content).hexdigest()
if cbr.validation_passed:
logger.info(
"Monthly returns generated successfully: period=%s records=%d checksum=%s",
reporting_month, cbr.record_count, cbr.checksum[:16],
)
else:
logger.warning(
"Monthly returns generated with errors: period=%s errors=%d",
reporting_month, len(cbr.validation_errors),
)
return cbr
def _validate_monthly_returns(self, rows: list, reporting_month: str) -> list[str]:
"""Validate monthly returns against CBN format specifications."""
errors = []
# Check required line items are present
required_codes = [
"BS001", "BS002", "BS003", # Assets
"BS101", "BS102", "BS103", # Liabilities
"PL001", "PL002", "PL003", # Income
"PL101", "PL102", # Expenses
]
present_codes = {row[2] for row in rows} # cbn_line_item_code
missing = set(required_codes) - present_codes
if missing:
errors.append(f"Missing required CBN line items: {missing}")
# Check total assets = total liabilities + equity
# (simplified — real implementation would sum specific line items)
return errors
def _reconcile_with_source(self, reporting_month: str, institution_code: str) -> dict:
"""
Reconcile warehouse data against source core banking system.
This is critical: CBN auditors will check whether your returns
match your actual books. Any unexplained variance is a red flag.
"""
with self.warehouse.cursor() as cur:
cur.execute("""
SELECT SUM(balance_ngn) AS warehouse_total
FROM regulatory_balance_sheet
WHERE reporting_month = %s AND institution_code = %s;
""", (reporting_month, institution_code))
warehouse_total = cur.fetchone()[0] or 0
with self.source.cursor() as cur:
cur.execute("""
SELECT SUM(balance_lcy) AS source_total
FROM gl_balances
WHERE period = %s;
""", (reporting_month,))
source_total = cur.fetchone()[0] or 0
variance = abs(warehouse_total - source_total)
# Tolerance: allow NGN 1.00 for rounding
balanced = variance <= 1.00
return {
"warehouse_total": warehouse_total,
"source_total": source_total,
"variance_ngn": variance,
"balanced": balanced,
}
Automating Regulatory Reporting with Data Engineering
Manual regulatory reporting is the single biggest compliance risk for Nigerian organisations. Not because people are careless, but because manual processes do not scale, do not self-verify, and do not leave adequate audit trails. Let us build the automation layer.
Compliance Pipeline Architecture
┌──────────────────────────────────────────────────────────────────────────────┐
│ COMPLIANCE PIPELINE ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ SOURCE SYSTEMS INGESTION COMPLIANCE WAREHOUSE │
│ ────────────── ───────── ──────────────────── │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌────────────────────────┐ │
│ │Core Bank │────>│ │ │ RAW ZONE │ │
│ └──────────┘ │ │ │ - Immutable landing │ │
│ ┌──────────┐ │ CDC / │ │ - Full audit trail │ │
│ │CRM │────>│ Batch │────>│ - Source checksums │ │
│ └──────────┘ │ Ingestion │ └────────┬───────────────┘ │
│ ┌──────────┐ │ │ │ │
│ │Mobile App│────>│ │ v │
│ └──────────┘ └──────────────┘ ┌────────────────────────┐ │
│ ┌──────────┐ │ CONFORMED ZONE │ │
│ │3rd Party │──────────────────────────│ - Standardised schemas│ │
│ │APIs │ │ - PII tagged/masked │ │
│ └──────────┘ │ - Consent-gated │ │
│ └────────┬───────────────┘ │
│ │ │
│ ┌──────────────────┼───────────────┐ │
│ │ │ │ │
│ v v v │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ NDPR │ │ CBN │ │ NCC │ │
│ │ Compliance │ │ Returns │ │ Reports │ │
│ │ ────────── │ │ ────────── │ │ ────────── │ │
│ │ Consent logs │ │ Form M │ │ QoS metrics │ │
│ │ DPIA records │ │ STR/CTR │ │ Subscriber │ │
│ │ Erasure logs │ │ Credit Bureau │ │ Complaints │ │
│ │ Breach data │ │ FX Returns │ │ SIM data │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼───────────────┘ │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ AUDIT & LINEAGE │ │
│ │ ──────────────── │ │
│ │ - Data lineage │ │
│ │ - Transformation │ │
│ │ history │ │
│ │ - Access logs │ │
│ │ - Quality scores │ │
│ └──────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Data Lineage for Regulatory Evidence
When a regulator asks "where did this number come from?", you need to answer in seconds, not days. Data lineage tracking captures the complete transformation history of every data point in your compliance warehouse.
-- Data lineage tracking schema
CREATE TABLE data_lineage_events (
lineage_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_name VARCHAR(100) NOT NULL,
pipeline_run_id VARCHAR(100) NOT NULL,
step_name VARCHAR(100) NOT NULL,
step_order INT NOT NULL,
-- Source information
source_system VARCHAR(100) NOT NULL,
source_table VARCHAR(200),
source_query_hash VARCHAR(64), -- SHA-256 of the extraction query
source_row_count BIGINT,
source_checksum VARCHAR(64), -- Checksum of source data
-- Destination information
destination_table VARCHAR(200) NOT NULL,
destination_row_count BIGINT,
destination_checksum VARCHAR(64),
-- Transformation information
transformation_type VARCHAR(50), -- 'extract', 'transform', 'load', 'validate'
transformation_desc TEXT,
transformation_code_hash VARCHAR(64), -- Hash of the transformation code version
-- Row-level tracking
rows_inserted BIGINT DEFAULT 0,
rows_updated BIGINT DEFAULT 0,
rows_deleted BIGINT DEFAULT 0,
rows_rejected BIGINT DEFAULT 0,
-- Quality metrics
null_count BIGINT DEFAULT 0,
duplicate_count BIGINT DEFAULT 0,
validation_errors JSONB,
-- Timing
started_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
duration_ms BIGINT,
-- Context
triggered_by VARCHAR(100), -- 'schedule', 'manual', 'event'
git_commit_hash VARCHAR(40), -- Code version that produced this
environment VARCHAR(20), -- 'production', 'staging'
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index for tracing a specific record's lineage
CREATE INDEX idx_lineage_pipeline_run
ON data_lineage_events (pipeline_run_id, step_order);
CREATE INDEX idx_lineage_destination
ON data_lineage_events (destination_table, created_at DESC);
-- View: Trace lineage from a regulatory report back to source
CREATE VIEW regulatory_report_lineage AS
SELECT
dle.pipeline_name,
dle.pipeline_run_id,
dle.step_name,
dle.step_order,
dle.source_system,
dle.source_table,
dle.destination_table,
dle.transformation_desc,
dle.source_row_count,
dle.destination_row_count,
dle.rows_rejected,
dle.source_checksum,
dle.destination_checksum,
dle.started_at,
dle.completed_at,
dle.git_commit_hash
FROM data_lineage_events dle
WHERE dle.destination_table LIKE 'regulatory_%'
ORDER BY dle.pipeline_run_id, dle.step_order;
Reconciliation Framework
Every regulatory report must be reconciled against source systems before submission. Discrepancies must be investigated, documented, and either resolved or explained.
"""
reconciliation.py — Automated Reconciliation for Regulatory Reports
Implements multi-level reconciliation checks:
1. Row count reconciliation (source vs destination)
2. Aggregate reconciliation (sum/count/avg checks)
3. Hash-based reconciliation (row-level integrity)
4. Cross-system reconciliation (warehouse vs source)
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging
import hashlib
logger = logging.getLogger(__name__)
class ReconciliationStatus(Enum):
PASS = "pass"
FAIL = "fail"
WARNING = "warning" # Within tolerance but not exact
@dataclass
class ReconciliationCheck:
check_name: str
check_type: str # 'row_count', 'aggregate', 'hash', 'cross_system'
source_value: float
target_value: float
tolerance: float # Absolute tolerance (e.g., 1.0 for rounding)
tolerance_pct: float # Percentage tolerance (e.g., 0.001 for 0.1%)
status: ReconciliationStatus = ReconciliationStatus.PASS
variance: float = 0.0
variance_pct: float = 0.0
notes: str = ""
@dataclass
class ReconciliationReport:
report_name: str
reporting_period: str
checks: list[ReconciliationCheck]
overall_status: ReconciliationStatus = ReconciliationStatus.PASS
generated_at: datetime = None
signed_off_by: Optional[str] = None
signed_off_at: Optional[datetime] = None
class ReconciliationEngine:
"""
Automated reconciliation engine for regulatory reports.
Runs configurable checks and produces an auditable reconciliation
report that can be attached to regulatory submissions as evidence
of data accuracy.
"""
def __init__(self, db_connection):
self.conn = db_connection
def run_reconciliation(
self,
report_name: str,
reporting_period: str,
checks: list[dict],
) -> ReconciliationReport:
"""
Run a set of reconciliation checks and produce a report.
Each check dict contains:
- name: Human-readable check name
- type: 'row_count', 'aggregate', 'hash', 'cross_system'
- source_query: SQL to get source value
- target_query: SQL to get target value
- tolerance: Absolute tolerance
- tolerance_pct: Percentage tolerance
"""
results = []
for check_config in checks:
with self.conn.cursor() as cur:
cur.execute(check_config["source_query"], (reporting_period,))
source_val = cur.fetchone()[0] or 0
cur.execute(check_config["target_query"], (reporting_period,))
target_val = cur.fetchone()[0] or 0
variance = abs(source_val - target_val)
variance_pct = (variance / source_val * 100) if source_val != 0 else 0
tolerance = check_config.get("tolerance", 0)
tolerance_pct = check_config.get("tolerance_pct", 0)
if variance <= tolerance or variance_pct <= tolerance_pct:
status = ReconciliationStatus.PASS
elif variance_pct <= tolerance_pct * 2:
status = ReconciliationStatus.WARNING
else:
status = ReconciliationStatus.FAIL
results.append(ReconciliationCheck(
check_name=check_config["name"],
check_type=check_config["type"],
source_value=source_val,
target_value=target_val,
tolerance=tolerance,
tolerance_pct=tolerance_pct,
status=status,
variance=variance,
variance_pct=variance_pct,
))
# Determine overall status
if any(c.status == ReconciliationStatus.FAIL for c in results):
overall = ReconciliationStatus.FAIL
elif any(c.status == ReconciliationStatus.WARNING for c in results):
overall = ReconciliationStatus.WARNING
else:
overall = ReconciliationStatus.PASS
report = ReconciliationReport(
report_name=report_name,
reporting_period=reporting_period,
checks=results,
overall_status=overall,
generated_at=datetime.utcnow(),
)
# Persist reconciliation report for audit
self._persist_report(report)
if overall == ReconciliationStatus.FAIL:
logger.error(
"Reconciliation FAILED for %s period=%s — DO NOT SUBMIT",
report_name, reporting_period,
)
else:
logger.info(
"Reconciliation %s for %s period=%s (%d checks passed)",
overall.value, report_name, reporting_period,
sum(1 for c in results if c.status == ReconciliationStatus.PASS),
)
return report
def _persist_report(self, report: ReconciliationReport):
"""Save reconciliation report for audit trail."""
with self.conn.cursor() as cur:
for check in report.checks:
cur.execute("""
INSERT INTO reconciliation_audit
(report_name, reporting_period, check_name, check_type,
source_value, target_value, variance, variance_pct,
status, generated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
""", (
report.report_name, report.reporting_period,
check.check_name, check.check_type,
check.source_value, check.target_value,
check.variance, check.variance_pct,
check.status.value, report.generated_at,
))
self.conn.commit()
Scheduling and Orchestration
Regulatory returns have strict deadlines. An orchestration layer ensures returns are generated, validated, reconciled, and ready for submission well before due dates.
"""
compliance_scheduler.py — Regulatory Return Scheduling
Defines the schedule for all Nigerian regulatory returns with
appropriate lead times, escalation paths, and retry logic.
Compatible with Apache Airflow, Dagster, or Prefect.
"""
# Airflow DAG definition (conceptual — adapt to your orchestrator)
from datetime import timedelta
REGULATORY_SCHEDULE = {
"cbn_monthly_returns": {
"description": "CBN Monthly Returns (Form M)",
"cron": "0 6 10 * *", # 10th of each month at 6 AM WAT
"deadline_day": 15, # Must be submitted by 15th
"lead_time_days": 5, # Start generation 5 days before deadline
"steps": [
"extract_from_core_banking",
"transform_to_cbn_format",
"reconcile_with_source",
"validate_cbn_format",
"generate_submission_file",
"notify_compliance_team",
],
"escalation": {
"warning_days_before_deadline": 3,
"critical_days_before_deadline": 1,
"notify": ["[email protected]", "[email protected]"],
},
"retry_policy": {
"max_retries": 3,
"retry_delay_minutes": 30,
},
},
"cbn_daily_fx_returns": {
"description": "CBN Daily Foreign Exchange Returns",
"cron": "0 18 * * 1-5", # 6 PM WAT on business days
"deadline": "same_day",
"steps": [
"extract_fx_transactions",
"validate_fx_rates",
"reconcile_fx_positions",
"generate_fx_return",
"submit_to_cbn_portal",
],
"escalation": {
"notify_on_failure": ["[email protected]", "[email protected]"],
},
"retry_policy": {
"max_retries": 5,
"retry_delay_minutes": 15,
},
},
"cbn_str_filing": {
"description": "CBN Suspicious Transaction Report",
"trigger": "event_based", # Triggered by AML detection pipeline
"deadline_hours": 72, # Must file within 72 hours of detection
"steps": [
"enrich_transaction_context",
"generate_str_narrative",
"attach_supporting_docs",
"route_to_compliance_officer",
"file_with_nfiu", # Nigerian Financial Intelligence Unit
],
"escalation": {
"warning_hours": 24,
"critical_hours": 48,
"notify": ["[email protected]", "[email protected]", "[email protected]"],
},
},
"ndpr_annual_audit": {
"description": "NDPR/NDPA Annual Data Protection Audit",
"cron": "0 9 1 3 *", # March 1st — start of audit preparation
"deadline_month": 3, # Q1 submission to NITDA/NDPC
"steps": [
"inventory_all_data_processing",
"assess_lawful_basis_coverage",
"verify_consent_records",
"check_retention_compliance",
"review_cross_border_transfers",
"compile_dpia_records",
"generate_audit_report",
"engage_dpco", # Data Protection Compliance Organisation
],
},
"credit_bureau_monthly": {
"description": "Credit Bureau Returns (CRC, First Central, CreditRegistry)",
"cron": "0 6 20 * *", # 20th of each month
"deadline_day": 30,
"steps": [
"extract_loan_portfolio",
"map_to_bureau_format",
"validate_bvn_matching",
"reconcile_account_counts",
"generate_bureau_files",
"submit_to_bureaus",
],
},
"firs_vat_returns": {
"description": "FIRS VAT Monthly Returns",
"cron": "0 6 15 * *", # 15th of each month
"deadline_day": 21,
"steps": [
"extract_vat_transactions",
"calculate_input_output_vat",
"reconcile_with_gl",
"generate_vat_return",
"prepare_taxpro_max_upload",
],
},
"ncc_qos_quarterly": {
"description": "NCC Quality of Service Quarterly Report",
"cron": "0 6 15 1,4,7,10 *", # 15th of quarter-start months
"deadline_day": 30, # 30 days after quarter end
"steps": [
"aggregate_network_metrics",
"calculate_qos_kpis",
"generate_ncc_format_report",
"validate_against_thresholds",
"submit_to_ncc",
],
},
}
Data Residency and Sovereignty
Data residency is one of the most misunderstood aspects of Nigerian compliance. Many organisations assume that storing data "in the cloud" is sufficient, without considering where that cloud infrastructure is physically located or how data flows between regions.
Where Nigerian Data Must Live
The NDPA and NDPR establish clear principles on data residency:
-
Primary copy in Nigeria: NDPR Implementation Framework 2020 (Section 4.2) requires that a copy of all personal data collected in Nigeria must be stored on a server or data centre physically located within Nigeria.
-
Cross-border transfer conditions (NDPA Part VI, Sections 47-49):
- Transfer to a country with "adequate level of protection" as determined by the NDPC
- Transfer with explicit consent of the data subject
- Transfer necessary for performance of a contract
- Transfer with appropriate safeguards (binding corporate rules, standard contractual clauses)
-
CBN requirements: The CBN has issued multiple circulars requiring financial institutions to maintain core banking systems and customer data within Nigeria. The CBN's Cloud Computing Framework allows use of cloud services but mandates that primary data centres for core banking operations be in Nigeria.
-
NCC requirements: SIM registration data and subscriber identity data must be maintained within Nigeria and accessible to the NCC on demand.
Cloud Region Selection Strategy
┌──────────────────────────────────────────────────────────────────────────┐
│ CLOUD REGION STRATEGY FOR NIGERIAN COMPLIANCE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ PRIMARY REGION: Africa (Lagos or Cape Town) │
│ ───────────────────────────────────────────── │
│ AWS: af-south-1 (Cape Town) — Closest AWS region │
│ Azure: South Africa North — Johannesburg │
│ GCP: me-central2 (Dammam) — Nearest GCP region │
│ Local: Rack Centre (Lagos), MainOne MDXi (Lagos) │
│ │
│ ┌─────────────────────┐ │
│ │ DATA CLASSIFICATION │ │
│ ├─────────────────────┤ │
│ │ │ │
│ │ TIER 1: Nigeria-Only │
│ │ ────────────────── │
│ │ - PII of Nigerian residents (NDPR) │
│ │ - Core banking data (CBN) │
│ │ - SIM registration data (NCC) │
│ │ - Tax records (FIRS) │
│ │ Storage: Nigerian data centre (Rack Centre / MainOne) │
│ │ │
│ │ TIER 2: Africa Region OK │
│ │ ──────────────────── │
│ │ - Aggregated analytics (no PII) │
│ │ - Application logs (PII stripped) │
│ │ - Infrastructure metrics │
│ │ Storage: AWS af-south-1 / Azure South Africa North │
│ │ │
│ │ TIER 3: Global (with safeguards) │
│ │ ──────────────────────────── │
│ │ - CDN cached content (non-personal) │
│ │ - Open-source model inference (no PII in prompts) │
│ │ - Public marketing analytics │
│ │ Storage: Any region, no PII │
│ │ │
│ └─────────────────────┘ │
│ │
│ IMPORTANT: Even Tier 2/3 data may require Nigerian copy if it was │
│ derived from personal data of Nigerian residents. │
└──────────────────────────────────────────────────────────────────────────┘
Encryption Requirements
Nigerian regulations mandate encryption at multiple levels:
# encryption_policy.yaml — Encryption Standards for Nigerian Compliance
encryption_requirements:
at_rest:
standard: "AES-256"
key_management: "AWS KMS / Azure Key Vault with customer-managed keys"
scope:
- "All databases containing personal data"
- "All file storage containing personal data"
- "All backup volumes"
- "All archive storage"
regulatory_basis:
- "NDPR Implementation Framework, Sec 2.5"
- "CBN Risk-Based Cybersecurity Framework, Sec 4.3"
in_transit:
standard: "TLS 1.2 minimum, TLS 1.3 preferred"
scope:
- "All API communications"
- "All database connections"
- "All inter-service communication"
- "All data pipeline transfers"
certificate_management:
provider: "Let's Encrypt / AWS ACM"
rotation: "90 days automatic"
regulatory_basis:
- "NDPR Implementation Framework, Sec 2.5"
- "CBN Guidelines on Electronic Banking"
field_level:
description: "Additional encryption for highly sensitive fields"
algorithm: "AES-256-GCM with per-field keys"
fields:
- "bvn" # Bank Verification Number
- "nin" # National Identification Number
- "account_number"
- "card_number" # Must also comply with PCI DSS
- "biometric_data"
- "health_records"
key_rotation: "Quarterly"
regulatory_basis:
- "NDPA Part III, Sec 29 — Security of Processing"
- "CBN AML/CFT Regulations — Customer Data Protection"
tokenization:
description: "Replace sensitive values with non-reversible tokens for analytics"
use_cases:
- "BVN in analytics pipelines (tokenize, do not encrypt)"
- "Phone numbers in aggregated reporting"
- "Account numbers in cross-system reconciliation"
algorithm: "HMAC-SHA256 with rotating salt"
regulatory_benefit: "Tokenized data may not be considered personal data under NDPA"
Cross-Border Transfer Mechanism
"""
cross_border_transfer.py — NDPA-Compliant Cross-Border Data Transfer
Implements policy-as-code for cross-border data transfers,
enforcing NDPA Part VI requirements automatically.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class TransferDecision(Enum):
ALLOWED = "allowed"
ALLOWED_WITH_SAFEGUARDS = "allowed_with_safeguards"
REQUIRES_CONSENT = "requires_consent"
REQUIRES_DPIA = "requires_dpia"
BLOCKED = "blocked"
class AdequacyStatus(Enum):
ADEQUATE = "adequate"
PARTIALLY_ADEQUATE = "partially_adequate"
NOT_ADEQUATE = "not_adequate"
NOT_ASSESSED = "not_assessed"
# Jurisdiction adequacy assessments (based on NDPC guidance)
JURISDICTION_ADEQUACY = {
"NG": AdequacyStatus.ADEQUATE, # Nigeria — domestic
"GH": AdequacyStatus.ADEQUATE, # Ghana (Data Protection Act 2012)
"ZA": AdequacyStatus.ADEQUATE, # South Africa (POPIA)
"KE": AdequacyStatus.PARTIALLY_ADEQUATE, # Kenya (Data Protection Act 2019)
"RW": AdequacyStatus.PARTIALLY_ADEQUATE, # Rwanda
"EU": AdequacyStatus.ADEQUATE, # EU (GDPR)
"GB": AdequacyStatus.ADEQUATE, # UK (UK GDPR)
"CA": AdequacyStatus.ADEQUATE, # Canada (PIPEDA)
"US": AdequacyStatus.NOT_ADEQUATE, # USA — no federal data protection law
"CN": AdequacyStatus.PARTIALLY_ADEQUATE, # China (PIPL)
"IN": AdequacyStatus.PARTIALLY_ADEQUATE, # India (DPDP Act 2023)
"AE": AdequacyStatus.PARTIALLY_ADEQUATE, # UAE (PDPL)
}
@dataclass
class TransferRequest:
"""A request to transfer personal data across borders."""
request_id: str
data_category: str # 'personal', 'sensitive', 'financial', 'health'
source_jurisdiction: str # ISO 3166-1 alpha-2
dest_jurisdiction: str
data_subject_count: int
purpose: str
recipient_name: str
recipient_type: str # 'processor', 'controller', 'joint_controller'
has_explicit_consent: bool
has_binding_corporate_rules: bool
has_standard_contractual_clauses: bool
has_dpia: bool
requested_by: str
requested_at: datetime
class CrossBorderTransferGate:
"""
Evaluates and enforces cross-border data transfer compliance.
Implements NDPA Part VI, Sections 47-49 requirements:
- Checks destination jurisdiction adequacy
- Evaluates available safeguards
- Makes allow/block decisions with reasoning
- Logs all decisions for audit
"""
def __init__(self, db_connection):
self.conn = db_connection
def evaluate_transfer(self, request: TransferRequest) -> tuple[TransferDecision, str]:
"""
Evaluate a cross-border transfer request and return a decision.
Returns (decision, reasoning) tuple.
"""
dest_adequacy = JURISDICTION_ADEQUACY.get(
request.dest_jurisdiction, AdequacyStatus.NOT_ASSESSED
)
# Decision logic per NDPA Part VI
if request.source_jurisdiction == request.dest_jurisdiction:
decision = TransferDecision.ALLOWED
reason = "Domestic transfer — no cross-border rules apply"
elif dest_adequacy == AdequacyStatus.ADEQUATE:
decision = TransferDecision.ALLOWED
reason = (
f"Destination {request.dest_jurisdiction} has adequate "
f"data protection per NDPC assessment"
)
elif request.has_explicit_consent:
decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
reason = (
"Transfer allowed based on explicit consent of data subjects "
"(NDPA Sec 48(1)(a)). Consent records must be maintained."
)
elif request.has_binding_corporate_rules:
decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
reason = (
"Transfer allowed under Binding Corporate Rules "
"(NDPA Sec 48(1)(c)). BCR documentation must be on file."
)
elif request.has_standard_contractual_clauses:
if request.has_dpia:
decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
reason = (
"Transfer allowed under Standard Contractual Clauses with DPIA "
"(NDPA Sec 48(1)(b)). SCC and DPIA must be on file."
)
else:
decision = TransferDecision.REQUIRES_DPIA
reason = (
"SCCs present but DPIA required for transfer to "
f"{request.dest_jurisdiction} (adequacy: {dest_adequacy.value})"
)
elif dest_adequacy == AdequacyStatus.PARTIALLY_ADEQUATE:
decision = TransferDecision.REQUIRES_CONSENT
reason = (
f"Destination {request.dest_jurisdiction} has partial adequacy. "
"Explicit consent or additional safeguards (BCR/SCC) required."
)
else:
decision = TransferDecision.BLOCKED
reason = (
f"Transfer to {request.dest_jurisdiction} blocked. "
f"Adequacy status: {dest_adequacy.value}. "
"No qualifying safeguards (consent, BCR, or SCC) provided."
)
# Log the decision
self._log_decision(request, decision, reason)
logger.info(
"Cross-border transfer decision: %s -> %s = %s (reason: %s)",
request.source_jurisdiction, request.dest_jurisdiction,
decision.value, reason[:100],
)
return decision, reason
def _log_decision(self, request: TransferRequest, decision: TransferDecision, reason: str):
"""Log transfer decision to audit table."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO cross_border_transfer_audit
(request_id, data_category, source_jurisdiction, dest_jurisdiction,
data_subject_count, purpose, recipient_name, decision, reason,
requested_by, decided_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW());
""", (
request.request_id, request.data_category,
request.source_jurisdiction, request.dest_jurisdiction,
request.data_subject_count, request.purpose,
request.recipient_name, decision.value, reason,
request.requested_by,
))
self.conn.commit()
Comparison with Global Frameworks
Nigerian organisations increasingly operate across borders — fintech companies serving the diaspora, multinationals with Nigerian subsidiaries, and Nigerian firms expanding into other African markets. Understanding how Nigerian regulations compare with global frameworks is essential for building unified compliance architectures.
NDPR/NDPA vs GDPR vs CCPA
| Dimension | NDPR/NDPA (Nigeria) | GDPR (EU) | CCPA/CPRA (California) |
|---|---|---|---|
| Scope | Any entity processing personal data of Nigerian residents | Any entity processing personal data of EU residents | Businesses meeting revenue/data volume thresholds in California |
| Lawful basis | 6 bases (consent, contract, legal obligation, vital interest, public interest, legitimate interest) | 6 bases (same as NDPA) | No lawful basis concept; focuses on consumer rights |
| Consent standard | Freely given, specific, informed, unambiguous | Freely given, specific, informed, unambiguous | Opt-out model (not opt-in) for sale/sharing |
| Data subject rights | Access, rectification, erasure, portability, objection | Access, rectification, erasure, portability, objection, restriction, automated decision | Know, delete, opt-out of sale, non-discrimination, correct, limit |
| Breach notification | 72 hours to NDPC | 72 hours to supervisory authority | "Without unreasonable delay" to consumers |
| DPO required | Yes, above prescribed thresholds | Yes, for certain processing types | No (but requires privacy policy) |
| Cross-border transfers | Adequacy decision, consent, BCR, SCC | Adequacy decision, SCCs, BCR, derogations | No specific cross-border transfer restrictions |
| Penalties (max) | 2% annual gross revenue or NGN 10M (whichever is greater) | 4% annual global turnover or EUR 20M (whichever is greater) | $7,500 per intentional violation |
| Children's data | Special protections required | Consent required from holder of parental responsibility (under 16, member states may lower to 13) | COPPA applies; CCPA additional protections under 16 |
| Data residency | Copy must be stored in Nigeria | No explicit residency requirement (but Schrems II implications) | No residency requirement |
| Enforcement body | NDPC (Nigeria Data Protection Commission) | National DPAs (e.g., CNIL, ICO) | California AG, California Privacy Protection Agency |
Building a Unified Compliance Architecture
For organisations operating across Nigeria, the EU, and the US, the strategy is to build to the highest common denominator and then layer jurisdiction-specific rules on top.
┌──────────────────────────────────────────────────────────────────────────┐
│ UNIFIED MULTI-JURISDICTION COMPLIANCE ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ COMMON COMPLIANCE LAYER │ │
│ │ ────────────────────────────────────── │ │
│ │ - Consent management (GDPR-grade = NDPA-compliant) │ │
│ │ - Data minimisation by default │ │
│ │ - Purpose limitation enforcement │ │
│ │ - Encryption at rest and in transit │ │
│ │ - Audit trail for all data processing │ │
│ │ - Data lineage tracking │ │
│ │ - Breach detection and notification pipeline │ │
│ │ - Right to erasure / deletion capability │ │
│ │ - Data Protection Impact Assessment workflow │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────────────┐ │
│ │ NDPA LAYER │ │ GDPR LAYER │ │ CCPA/CPRA LAYER │ │
│ │ ────────── │ │ ────────── │ │ ───────────── │ │
│ │ Nigeria- │ │ EU-specific │ │ California-specific │ │
│ │ specific: │ │ additions: │ │ additions: │ │
│ │ │ │ │ │ │ │
│ │ - Data │ │ - Art 30 │ │ - "Do Not Sell" │ │
│ │ residency │ │ processing │ │ signal handling │ │
│ │ (NG copy) │ │ register │ │ - Opt-out │ │
│ │ - NITDA/NDPC │ │ - DPA │ │ preference │ │
│ │ reporting │ │ notification │ │ centre │ │
│ │ - DPCO annual │ │ (per member │ │ - Financial │ │
│ │ audit │ │ state) │ │ incentive │ │
│ │ - CBN/NCC │ │ - DPIA for │ │ disclosures │ │
│ │ sector reqs │ │ high risk │ │ - Shine the Light │ │
│ │ - BVN/NIN │ │ - Lead DPA │ │ Act compliance │ │
│ │ handling │ │ designation │ │ - Annual privacy │ │
│ │ │ │ - Transfer │ │ metric reporting │ │
│ │ │ │ Impact │ │ │ │
│ │ │ │ Assessment │ │ │ │
│ └────────────────┘ └────────────────┘ └────────────────────────┘ │
│ │
│ POLICY ENGINE: Jurisdiction detected at ingestion time based on │
│ data subject's location/nationality. Rules applied automatically. │
└──────────────────────────────────────────────────────────────────────────┘
Policy-as-Code for Multi-Jurisdiction Compliance
"""
jurisdiction_policy.py — Policy-as-Code for Multi-Jurisdiction Compliance
Automatically applies the correct compliance rules based on the
jurisdiction of the data subject. Detected at ingestion time and
enforced throughout the pipeline.
"""
from dataclasses import dataclass, field
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class JurisdictionPolicy:
"""Compliance policy for a specific jurisdiction."""
jurisdiction: str
data_residency_required: bool
residency_locations: list[str]
retention_default_days: int
breach_notification_hours: int
dpo_required: bool
dpia_required_for_high_risk: bool
cross_border_restrictions: bool
consent_model: str # 'opt_in', 'opt_out'
right_to_erasure: bool
right_to_portability: bool
special_category_rules: bool
max_penalty_description: str
regulator: str
additional_requirements: list[str] = field(default_factory=list)
# Define policies per jurisdiction
POLICIES = {
"NG": JurisdictionPolicy(
jurisdiction="NG",
data_residency_required=True,
residency_locations=["ng-lagos-1", "ng-lagos-2"], # Nigerian data centres
retention_default_days=365,
breach_notification_hours=72,
dpo_required=True,
dpia_required_for_high_risk=True,
cross_border_restrictions=True,
consent_model="opt_in",
right_to_erasure=True,
right_to_portability=True,
special_category_rules=True,
max_penalty_description="2% annual gross revenue or NGN 10M",
regulator="NDPC (Nigeria Data Protection Commission)",
additional_requirements=[
"Annual DPCO audit submission to NDPC",
"Data protection officer appointment",
"Copy of personal data must reside in Nigeria",
"CBN sector-specific requirements for financial data",
"NCC sector-specific requirements for telecom data",
],
),
"EU": JurisdictionPolicy(
jurisdiction="EU",
data_residency_required=False,
residency_locations=[],
retention_default_days=365,
breach_notification_hours=72,
dpo_required=True,
dpia_required_for_high_risk=True,
cross_border_restrictions=True,
consent_model="opt_in",
right_to_erasure=True,
right_to_portability=True,
special_category_rules=True,
max_penalty_description="4% annual global turnover or EUR 20M",
regulator="National DPA (varies by member state)",
additional_requirements=[
"Article 30 Records of Processing Activities",
"Transfer Impact Assessment for non-adequate countries",
"Lead supervisory authority designation",
"72-hour breach notification to DPA",
],
),
"US-CA": JurisdictionPolicy(
jurisdiction="US-CA",
data_residency_required=False,
residency_locations=[],
retention_default_days=365,
breach_notification_hours=0, # "Without unreasonable delay"
dpo_required=False,
dpia_required_for_high_risk=False,
cross_border_restrictions=False,
consent_model="opt_out",
right_to_erasure=True,
right_to_portability=False,
special_category_rules=True, # Sensitive personal information under CPRA
max_penalty_description="$7,500 per intentional violation",
regulator="California Privacy Protection Agency (CPPA)",
additional_requirements=[
"Do Not Sell or Share signal support (GPC)",
"Financial incentive disclosures",
"Privacy policy with specific disclosures",
"Annual privacy metrics reporting (for large processors)",
],
),
}
class JurisdictionDetector:
"""
Determines the applicable jurisdiction for a data record.
Uses multiple signals:
1. Explicit jurisdiction tag (most reliable)
2. Phone number country code
3. IP geolocation
4. Address country
5. BVN/NIN presence (implies Nigerian)
"""
def detect(self, record: dict) -> str:
"""Return the jurisdiction code for a data record."""
# Priority 1: Explicit tag
if "jurisdiction" in record:
return record["jurisdiction"]
# Priority 2: Nigerian identifiers
if record.get("bvn") or record.get("nin"):
return "NG"
# Priority 3: Phone number prefix
phone = record.get("phone_number", "")
if phone.startswith("+234") or phone.startswith("234"):
return "NG"
# Priority 4: Address country
country = record.get("country", "").upper()
jurisdiction_map = {
"NIGERIA": "NG", "NG": "NG",
"GHANA": "GH", "GH": "GH",
"SOUTH AFRICA": "ZA", "ZA": "ZA",
"KENYA": "KE", "KE": "KE",
}
# EU member states
eu_countries = {
"DE", "FR", "IT", "ES", "NL", "BE", "AT", "SE", "PL",
"DK", "FI", "IE", "PT", "CZ", "RO", "HU", "BG", "HR",
"SK", "LT", "SI", "LV", "EE", "CY", "LU", "MT",
}
if country in jurisdiction_map:
return jurisdiction_map[country]
elif country in eu_countries:
return "EU"
elif country == "US":
state = record.get("state", "").upper()
if state in ("CA", "CALIFORNIA"):
return "US-CA"
return "US"
# Default: treat as Nigerian if we cannot determine
logger.warning("Could not determine jurisdiction for record, defaulting to NG")
return "NG"
def apply_jurisdiction_policy(record: dict, detector: JurisdictionDetector) -> dict:
"""
Annotate a data record with its applicable compliance policy.
This annotation travels with the record through all pipeline stages,
ensuring jurisdiction-specific rules are enforced at every step.
"""
jurisdiction = detector.detect(record)
policy = POLICIES.get(jurisdiction)
if policy is None:
# Unknown jurisdiction — apply most restrictive policy (NG + GDPR)
logger.warning("No policy for jurisdiction %s, applying NG policy", jurisdiction)
policy = POLICIES["NG"]
record["_compliance"] = {
"jurisdiction": jurisdiction,
"data_residency_required": policy.data_residency_required,
"residency_locations": policy.residency_locations,
"retention_days": policy.retention_default_days,
"consent_model": policy.consent_model,
"right_to_erasure": policy.right_to_erasure,
"cross_border_restricted": policy.cross_border_restrictions,
"regulator": policy.regulator,
"detected_at": "ingestion",
}
return record
Monitoring and Alerting for Compliance
Compliance is not a one-time build — it requires continuous monitoring. Implement dashboards and alerts that track compliance health in real time.
Key Compliance Metrics
-- Compliance health dashboard queries
-- 1. Consent coverage: What percentage of active users have valid consent?
SELECT
consent_type,
COUNT(*) FILTER (WHERE is_active = TRUE) AS consented,
COUNT(*) AS total_subjects,
ROUND(
COUNT(*) FILTER (WHERE is_active = TRUE)::NUMERIC / COUNT(*) * 100, 2
) AS consent_coverage_pct
FROM current_consent_state
GROUP BY consent_type;
-- 2. Retention compliance: Are we holding data beyond policy limits?
SELECT
policy_name,
table_name,
retention_days,
last_execution_date,
records_still_overdue,
CASE
WHEN records_still_overdue > 0 THEN 'NON_COMPLIANT'
WHEN last_execution_date < CURRENT_DATE - INTERVAL '1 day' THEN 'STALE'
ELSE 'COMPLIANT'
END AS status
FROM retention_policy_status;
-- 3. Erasure SLA: Are we meeting the 30-day erasure deadline?
SELECT
request_id,
data_subject_id,
requested_at,
completed_at,
status,
EXTRACT(DAY FROM (COALESCE(completed_at, NOW()) - requested_at)) AS days_elapsed,
CASE
WHEN status = 'completed' AND (completed_at - requested_at) <= INTERVAL '30 days'
THEN 'WITHIN_SLA'
WHEN status != 'completed' AND (NOW() - requested_at) > INTERVAL '25 days'
THEN 'AT_RISK'
WHEN status != 'completed' AND (NOW() - requested_at) > INTERVAL '30 days'
THEN 'SLA_BREACHED'
ELSE 'ON_TRACK'
END AS sla_status
FROM erasure_requests
WHERE requested_at >= CURRENT_DATE - INTERVAL '90 days'
ORDER BY requested_at DESC;
-- 4. Cross-border transfer decisions: Are we blocking non-compliant transfers?
SELECT
dest_jurisdiction,
decision,
COUNT(*) AS transfer_count,
SUM(data_subject_count) AS total_subjects
FROM cross_border_transfer_audit
WHERE decided_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY dest_jurisdiction, decision
ORDER BY dest_jurisdiction, decision;
-- 5. CBN returns submission status
SELECT
return_type,
reporting_period,
generated_at,
validation_passed,
reconciliation_status,
submitted_at,
CASE
WHEN submitted_at IS NOT NULL THEN 'SUBMITTED'
WHEN validation_passed = FALSE THEN 'VALIDATION_FAILED'
WHEN reconciliation_status = 'fail' THEN 'RECONCILIATION_FAILED'
WHEN deadline < NOW() AND submitted_at IS NULL THEN 'OVERDUE'
ELSE 'PENDING'
END AS status
FROM regulatory_returns_tracker
WHERE reporting_period >= TO_CHAR(CURRENT_DATE - INTERVAL '6 months', 'YYYY-MM')
ORDER BY reporting_period DESC, return_type;
-- 6. Data lineage completeness: Can we trace every regulatory report to source?
SELECT
destination_table,
COUNT(*) AS total_pipeline_runs,
COUNT(*) FILTER (WHERE source_checksum IS NOT NULL) AS with_source_checksum,
COUNT(*) FILTER (WHERE destination_checksum IS NOT NULL) AS with_dest_checksum,
COUNT(*) FILTER (WHERE git_commit_hash IS NOT NULL) AS with_code_version,
ROUND(
COUNT(*) FILTER (
WHERE source_checksum IS NOT NULL
AND destination_checksum IS NOT NULL
AND git_commit_hash IS NOT NULL
)::NUMERIC / NULLIF(COUNT(*), 0) * 100, 2
) AS full_lineage_pct
FROM data_lineage_events
WHERE destination_table LIKE 'regulatory_%'
AND created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY destination_table;
Alerting Rules
# compliance_alerts.yaml — Compliance Monitoring Alert Configuration
alerts:
- name: "consent_coverage_low"
description: "Consent coverage has dropped below threshold"
query: |
SELECT consent_type, consent_coverage_pct
FROM consent_coverage_view
WHERE consent_coverage_pct < 80
severity: "warning"
threshold: 80
channel: "slack:#compliance-alerts"
escalation: "[email protected]"
- name: "erasure_sla_at_risk"
description: "Erasure request approaching 30-day NDPA deadline"
query: |
SELECT COUNT(*) FROM erasure_requests
WHERE status != 'completed'
AND (NOW() - requested_at) > INTERVAL '25 days'
severity: "critical"
threshold: 0
channel: "slack:#compliance-alerts, pagerduty"
escalation: "[email protected], [email protected]"
- name: "cbn_return_overdue"
description: "CBN regulatory return past submission deadline"
query: |
SELECT return_type, reporting_period
FROM regulatory_returns_tracker
WHERE deadline < NOW() AND submitted_at IS NULL
severity: "critical"
channel: "slack:#compliance-alerts, pagerduty, sms"
escalation: "[email protected], [email protected]"
- name: "reconciliation_failure"
description: "Reconciliation check failed for regulatory report"
query: |
SELECT report_name, reporting_period
FROM reconciliation_audit
WHERE status = 'fail'
AND generated_at >= CURRENT_DATE - INTERVAL '1 day'
severity: "high"
channel: "slack:#compliance-alerts"
escalation: "[email protected]"
- name: "cross_border_transfer_blocked"
description: "Cross-border transfer was blocked by policy engine"
query: |
SELECT dest_jurisdiction, reason
FROM cross_border_transfer_audit
WHERE decision = 'blocked'
AND decided_at >= CURRENT_DATE - INTERVAL '1 day'
severity: "info"
channel: "slack:#compliance-alerts"
- name: "data_lineage_gap"
description: "Regulatory pipeline run missing lineage metadata"
query: |
SELECT pipeline_name, pipeline_run_id
FROM data_lineage_events
WHERE destination_table LIKE 'regulatory_%'
AND (source_checksum IS NULL OR git_commit_hash IS NULL)
AND created_at >= CURRENT_DATE - INTERVAL '1 day'
severity: "warning"
channel: "slack:#data-engineering"
- name: "retention_policy_stale"
description: "Retention policy has not executed on schedule"
query: |
SELECT policy_name, last_execution_date
FROM retention_policy_status
WHERE last_execution_date < CURRENT_DATE - INTERVAL '2 days'
severity: "warning"
channel: "slack:#compliance-alerts"
Implementation Roadmap
Building compliant data infrastructure is not an overnight project. Here is a phased approach that delivers value incrementally while managing risk.
Phase 1: Foundation (Weeks 1-4)
Goal: Establish the compliance data platform and consent management.
- Deploy compliance data warehouse (separate schema or dedicated cluster)
- Implement consent event store and materialised views
- Build pipeline consent gate (pre-ingestion check)
- Set up data lineage tracking for all regulatory pipelines
- Create retention policy definitions and automated enforcement
- Establish encryption-at-rest and in-transit baselines
Phase 2: Regulatory Automation (Weeks 5-8)
Goal: Automate the most time-sensitive regulatory returns.
- Build CBN monthly returns pipeline (Form M) with reconciliation
- Implement AML/CFT transaction monitoring and STR/CTR generation
- Build credit bureau reporting pipeline
- Deploy automated reconciliation framework
- Set up compliance monitoring dashboards
- Implement alerting for SLA breaches and overdue submissions
Phase 3: Advanced Compliance (Weeks 9-12)
Goal: Complete the compliance capability with cross-border, erasure, and multi-jurisdiction support.
- Implement right-to-erasure orchestration across all systems
- Build cross-border transfer policy engine
- Deploy DPIA automation workflow
- Implement multi-jurisdiction policy-as-code
- Build NCC QoS and subscriber reporting pipelines
- Conduct end-to-end compliance audit simulation
Phase 4: Continuous Improvement (Ongoing)
Goal: Maintain compliance posture and adapt to regulatory changes.
- Monthly compliance health reviews using dashboard metrics
- Quarterly reconciliation accuracy audits
- Annual DPCO audit preparation (automated report generation)
- Regulatory change monitoring (NDPC, CBN, NCC gazettes)
- Pipeline performance optimisation for growing data volumes
- Staff training on compliance tooling and procedures
Common Pitfalls and How to Avoid Them
Based on our experience working with Nigerian organisations, here are the most common compliance engineering mistakes:
1. Treating Compliance as a One-Time Project
Compliance is a continuous process, not a project with an end date. Regulatory requirements evolve — the NDPA replaced the NDPR, CBN circulars are issued regularly, and enforcement patterns shift. Build your infrastructure to accommodate change.
Solution: Use configuration-driven compliance rules (YAML policies, not hard-coded logic). When a regulation changes, update the policy file, not the pipeline code.
2. Ignoring Data Lineage Until an Audit
When a regulator asks for the provenance of a number in your return, "we think it came from the core banking system" is not an acceptable answer. Data lineage must be captured from day one.
Solution: Instrument every pipeline step with lineage events. Make lineage a first-class citizen in your data platform, not an afterthought.
3. Manual Reconciliation
Manual reconciliation introduces human error, is not scalable, and does not leave an adequate audit trail. The first time your reconciliation reveals a material variance at 11 PM the night before a submission deadline, you will wish you had automated it.
Solution: Automated reconciliation with configurable tolerances, clear pass/fail criteria, and escalation workflows.
4. Confusing Encryption with Compliance
Encrypting your database does not make you NDPR-compliant. Compliance requires consent management, purpose limitation, retention policies, erasure capabilities, breach notification, and much more. Encryption is necessary but not sufficient.
Solution: Use the comprehensive compliance architecture described in this guide. Encryption is one layer of many.
5. Not Accounting for Data Residency
Many organisations unknowingly violate NDPR data residency requirements by using cloud services that store data outside Nigeria without maintaining a local copy. AWS's closest region is Cape Town, not Lagos.
Solution: Classify your data by tier (Nigeria-only, Africa-regional, Global) and enforce residency rules at the infrastructure level. Consider Nigerian data centre providers (Rack Centre, MainOne MDXi) for Tier 1 data.
Conclusion
Nigeria's regulatory landscape is complex but navigable. The key insight is that compliance is an engineering problem, not a legal problem. Lawyers define what you must do; engineers build the systems that do it reliably, at scale, and with evidence.
By engineering compliance into your data infrastructure — consent gates in every pipeline, automated retention enforcement, lineage tracking on every transformation, reconciliation checks before every submission — you transform regulatory requirements from a quarterly fire drill into an always-on, continuously verified capability.
The organisations that thrive in Nigeria's regulatory environment will not be those that hire the most compliance officers or produce the thickest policy documents. They will be the ones that build compliant systems — infrastructure that makes non-compliance technically difficult and compliance the default state.
At Gemut Analytics, we help Nigerian organisations build exactly this kind of infrastructure. Whether you are a bank preparing for the next CBN examination, a fintech navigating NDPR for the first time, or a telecom operator automating NCC returns, the principles in this guide apply. The specific implementation will vary by sector, scale, and existing infrastructure — but the architecture patterns are universal.
Start with consent management and data lineage. Those two capabilities alone will transform your compliance posture. Then layer on automated reporting, retention enforcement, and cross-border controls. Within a quarter, you will have infrastructure that makes your compliance team's job easier and your regulator's questions answerable in seconds instead of weeks.
This guide reflects the Nigerian regulatory landscape as of May 2024. Regulations evolve — particularly as the NDPC issues implementation guidance under the NDPA. Always consult with qualified legal counsel for definitive regulatory interpretation. The technical architectures described here are implementation patterns, not legal advice.
Need help building NDPR, CBN, or NCC-compliant data infrastructure? Contact Gemut Analytics for a compliance architecture assessment tailored to your organisation and sector.
Key Takeaways
- ✓Nigeria's regulatory data landscape spans NDPR/NDPA, CBN, NCC, FIRS, and SEC — each with distinct data handling, retention, and reporting requirements
- ✓NDPR-compliant data pipelines require consent management, purpose limitation enforcement, automated retention policies, and right-to-erasure capabilities baked into the architecture
- ✓CBN data requirements for financial institutions can be met through automated transaction reporting pipelines with built-in AML/KYC data validation
- ✓Data lineage and audit trails — implemented through metadata catalogs and pipeline instrumentation — provide the evidence trail regulators increasingly demand
- ✓Cross-border data transfer compliance (NDPR vs GDPR vs CCPA) can be managed through policy-as-code frameworks that enforce jurisdiction-specific rules automatically



