Skip to main content
|15 min read|Intermediate

Navigating Data Compliance in Nigeria: Building NDPR, CBN, and NCC-Ready Data Infrastructure

A practical guide to building compliant data infrastructure in Nigeria, covering NDPR data protection requirements, CBN financial data guidelines, NCC telecom regulations, and automated compliance pipelines with audit trails.

ComplianceNDPRCBNData GovernanceRegulatory
TL;DR

Nigeria's regulatory landscape for data is rapidly evolving. The Nigeria Data Protection Regulation (NDPR) and its successor NDPA, CBN guidelines for financial institutions, NCC requirements for telecom operators, and FIRS tax reporting mandates create a complex compliance environment. Most Nigerian businesses still rely on manual processes for regulatory reporting — spreadsheets, email chains, and quarterly fire drills before submission deadlines. This guide demonstrates how to engineer compliance directly into your data infrastructure, transforming regulatory requirements from a burden into an automated, auditable, and continuously monitored capability.

Prerequisites
  • Understanding of data pipeline and data warehouse concepts
  • Familiarity with data governance principles
  • Basic knowledge of SQL and Python
Navigating Data Compliance in Nigeria: Building NDPR, CBN, and NCC-Ready Data Infrastructure

Introduction

Nigeria is Africa's largest economy, its most populous nation, and an increasingly digital-first market. With over 200 million citizens, a booming fintech ecosystem, and rapid telecom expansion, Nigerian organisations generate and process vast quantities of personal, financial, and operational data every day. Regulatory bodies have responded with a growing web of data protection and reporting requirements — but most organisations are still catching up.

The reality on the ground is sobering. At many Nigerian banks, quarterly CBN returns are assembled manually from spreadsheets exported across departments. Fintech startups discover NDPR consent requirements only after a NITDA audit notice arrives. Telecom operators scramble to reconcile NCC subscriber data reports against billing systems that were never designed with regulatory output in mind. The result is late submissions, inaccurate reports, regulatory penalties, and — worst of all — a false sense of compliance that crumbles under scrutiny.

This guide takes a different approach. Instead of treating compliance as a checkbox exercise bolted onto existing systems, we demonstrate how to engineer compliance into data infrastructure from the ground up. Every pipeline, every data model, every transformation becomes a compliance asset — automated, auditable, and continuously monitored.

┌─────────────────────────────────────────────────────────────────────────────┐
│                  NIGERIAN REGULATORY DATA LANDSCAPE                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   NITDA / NDPC                CBN               NCC          FIRS          │
│   ───────────────        ───────────       ──────────    ──────────        │
│   NDPR / NDPA            Banking Regs      Telecom Regs  Tax Reporting    │
│   Data Protection        AML/CFT           Subscriber    VAT / CIT        │
│   Consent Mgmt           KYC / CDD         Data          Transfer         │
│   Cross-border           Transaction       Quality of    Pricing          │
│   Transfers              Reporting         Service       Withholding      │
│   Breach Notify          Credit Bureau     Type Approval Tax              │
│                          Reporting                                         │
│                                                                             │
│                          SEC Nigeria                                        │
│                          ───────────                                        │
│                          Capital Markets                                    │
│                          Reporting                                          │
│                          Investor Data                                      │
│                          Protection                                         │
│                                                                             │
├─────────────────────────────────────────────────────────────────────────────┤
│  ALL REGULATORS: Increasing focus on audit trails, data lineage,           │
│  automated reporting, and demonstrable compliance (not just policies)       │
└─────────────────────────────────────────────────────────────────────────────┘

The stakes are significant. NDPR violations can result in fines of up to 2% of annual gross revenue or NGN 10 million (whichever is greater). CBN penalties for late or inaccurate returns can include monetary fines, operational restrictions, and even licence revocation in extreme cases. NCC enforcement actions can include fines of up to NGN 5 million per violation. This is not a theoretical risk — NITDA has actively investigated and sanctioned organisations since NDPR's introduction in 2019.


Nigeria's Regulatory Data Landscape

Before building compliant infrastructure, you must understand exactly what each regulator requires. Nigerian data regulations are not monolithic — they come from multiple bodies, apply to different sectors, and impose distinct obligations. Let us examine each in detail.

NDPR and NDPA: The Data Protection Foundation

The Nigeria Data Protection Regulation (NDPR), issued by NITDA in January 2019, was Nigeria's first comprehensive data protection framework. In June 2023, President Bola Tinubu signed the Nigeria Data Protection Act (NDPA) into law, establishing the Nigeria Data Protection Commission (NDPC) as an independent regulatory body. The NDPA supersedes the NDPR but largely builds upon its principles.

Key requirements for data engineers:

Requirement NDPR/NDPA Reference Technical Implication
Lawful basis for processing NDPA Part III, Sec 25-30 Consent management system with audit trail
Purpose limitation NDPA Part III, Sec 26 Data flow tagging and enforcement at pipeline level
Data minimisation NDPA Part III, Sec 27 Column-level access control, selective ingestion
Storage limitation NDPA Part III, Sec 28 Automated retention and deletion policies
Right to erasure NDPA Part IV, Sec 37 Soft-delete propagation across all downstream systems
Right to data portability NDPA Part IV, Sec 38 Standardised export formats, API endpoints
Data breach notification NDPA Part V, Sec 44 72-hour detection and notification pipeline
Cross-border transfer NDPA Part VI, Sec 47-49 Data residency enforcement, transfer impact assessments
Data Protection Impact Assessment NDPA Part III, Sec 31 Automated DPIA workflows for new data processing
Record of processing activities NDPA Part III, Sec 32 Metadata catalog with processing purpose annotations

NDPR Article 2.1 mandates that any organisation that processes personal data of Nigerian residents must have a Data Protection Compliance Organisation (DPCO) conduct an annual audit of its data processing activities. The NDPA reinforces this with the requirement for a Data Protection Officer (DPO) at organisations processing data above prescribed thresholds.

CBN Guidelines for Financial Institutions

The Central Bank of Nigeria (CBN) imposes extensive data and reporting requirements on banks, microfinance banks, payment service providers, and other financial institutions. Key frameworks include:

  • CBN AML/CFT Regulations 2022: Mandate customer due diligence (CDD), enhanced due diligence (EDD), suspicious transaction reporting (STR), and currency transaction reporting (CTR) with strict data retention requirements
  • CBN Risk-Based Supervision Framework: Requires granular transaction data for supervisory review, including real-time access to certain categories of transactions
  • CBN Guidelines on Electronic Banking 2003 (revised): Data security requirements for electronic channels, including encryption, access control, and audit logging
  • CBN Credit Bureau Reporting: Licensed credit bureaus (CRC, First Central, CreditRegistry) require regular data submissions from financial institutions
  • CBN Regulation on Open Banking 2023: API-based data sharing with consent management, rate limiting, and comprehensive logging

Reporting obligations:

Report Frequency Submission Window Key Data Points
Bank Returns (Form M) Monthly 15th of following month Balance sheet, P&L, asset quality
Suspicious Transaction Report Per event 24-72 hours Transaction details, parties, suspicion basis
Currency Transaction Report Daily Next business day Transactions above NGN 5M (individual) / NGN 10M (corporate)
Credit Bureau Returns Monthly 30 days after month-end Loan performance, repayment history
Financial Stability Returns Quarterly 30 days after quarter-end Capital adequacy, liquidity ratios
Foreign Exchange Returns Daily Same day FX transactions, positions, rates

NCC Requirements for Telecom Operators

The Nigerian Communications Commission (NCC) regulates telecommunications operators and imposes requirements around:

  • SIM Registration Regulations: NIN-SIM linkage data, subscriber identity verification data must be maintained and accessible to the NCC
  • Quality of Service (QoS) Reporting: Monthly QoS metrics including call drop rates, network availability, and data throughput
  • Type Approval Data: Device and equipment certification data
  • Consumer Complaint Data: Quarterly reporting on consumer complaints, resolution rates, and turnaround times
  • Interconnect Data: Call detail records for interconnect billing and dispute resolution

NCC's Consumer Code of Practice also requires telecom operators to maintain data privacy standards for subscriber information, which must align with NDPR/NDPA requirements.

FIRS Tax Reporting

The Federal Inland Revenue Service (FIRS) requires:

  • VAT Returns: Monthly, due 21 days after the month-end
  • Company Income Tax (CIT): Annual returns with supporting schedules
  • Transfer Pricing Documentation: For companies with related-party transactions above NGN 300 million
  • Withholding Tax Remittance: Monthly, due 21 days after deduction
  • FIRS TaxPro Max Integration: Electronic filing with structured data requirements

SEC Nigeria

The Securities and Exchange Commission Nigeria regulates capital market operators with requirements including:

  • Quarterly and annual financial returns for broker-dealers, fund managers, and registrars
  • Investor data protection aligned with NDPR/NDPA
  • Trade reporting for equities, fixed income, and derivatives
  • Beneficial ownership reporting under the Companies and Allied Matters Act (CAMA) 2020

Building NDPR-Compliant Data Pipelines

NDPR/NDPA compliance is not a policy document — it is a set of engineering constraints that must be embedded into every data pipeline. Let us examine the key technical components.

Consent is the most common lawful basis for processing personal data under NDPR. Every record in your data warehouse that originates from personal data must be traceable to a specific, informed, and freely given consent.

┌──────────────────────────────────────────────────────────────────────────┐
│                    CONSENT MANAGEMENT ARCHITECTURE                       │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌──────────┐    ┌────────────────┐    ┌──────────────────┐             │
│  │  User     │───>│  Consent       │───>│  Consent         │             │
│  │  Interface│    │  Collection    │    │  Event Store     │             │
│  │  (Web/App)│    │  Service       │    │  (Immutable Log) │             │
│  └──────────┘    └────────────────┘    └────────┬─────────┘             │
│                                                  │                       │
│                          ┌───────────────────────┤                       │
│                          │                       │                       │
│                          v                       v                       │
│                  ┌───────────────┐    ┌──────────────────┐              │
│                  │  Consent      │    │  Consent State   │              │
│                  │  Audit Log    │    │  Materialized    │              │
│                  │  (Append-only)│    │  View            │              │
│                  └───────────────┘    └────────┬─────────┘              │
│                                                │                        │
│                          ┌─────────────────────┤                        │
│                          │                     │                        │
│                          v                     v                        │
│                  ┌───────────────┐    ┌──────────────────┐              │
│                  │  Pipeline     │    │  Data Access     │              │
│                  │  Gate         │    │  Policy Engine   │              │
│                  │  (Pre-ingest) │    │  (Query-time)    │              │
│                  └───────────────┘    └──────────────────┘              │
│                                                                          │
│  Flow: User grants consent -> Event stored immutably -> Materialised    │
│  view updated -> Pipelines check consent before processing ->           │
│  Query engine enforces purpose limitation at read time                   │
└──────────────────────────────────────────────────────────────────────────┘

Data model for consent management:

-- Consent Events: Immutable log of all consent actions
CREATE TABLE consent_events (
    event_id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    data_subject_id     UUID NOT NULL,
    consent_type        VARCHAR(50) NOT NULL,  -- 'marketing', 'analytics', 'third_party_sharing', etc.
    action              VARCHAR(20) NOT NULL,  -- 'grant', 'revoke', 'modify'
    purpose             TEXT NOT NULL,
    lawful_basis        VARCHAR(30) NOT NULL,  -- 'consent', 'contract', 'legal_obligation', 'legitimate_interest'
    collected_via       VARCHAR(50) NOT NULL,  -- 'web_form', 'mobile_app', 'api', 'in_person'
    consent_text_hash   VARCHAR(64) NOT NULL,  -- SHA-256 of the exact consent text shown
    ip_address          INET,
    user_agent          TEXT,
    geo_jurisdiction     VARCHAR(10) NOT NULL, -- 'NG', 'EU', 'US', etc.
    event_timestamp     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at          TIMESTAMPTZ,
    metadata            JSONB,

    -- Immutability: no UPDATE or DELETE allowed via application layer
    CONSTRAINT valid_action CHECK (action IN ('grant', 'revoke', 'modify')),
    CONSTRAINT valid_basis CHECK (lawful_basis IN (
        'consent', 'contract', 'legal_obligation',
        'vital_interest', 'public_interest', 'legitimate_interest'
    ))
);

-- Materialised view: Current consent state per subject
CREATE MATERIALIZED VIEW current_consent_state AS
SELECT DISTINCT ON (data_subject_id, consent_type)
    data_subject_id,
    consent_type,
    action AS current_status,
    purpose,
    lawful_basis,
    event_timestamp AS last_updated,
    expires_at,
    CASE
        WHEN action = 'grant' AND (expires_at IS NULL OR expires_at > NOW())
        THEN TRUE
        ELSE FALSE
    END AS is_active
FROM consent_events
ORDER BY data_subject_id, consent_type, event_timestamp DESC;

-- Index for pipeline gate lookups
CREATE INDEX idx_consent_state_lookup
ON consent_events (data_subject_id, consent_type, event_timestamp DESC);

Every pipeline that processes personal data must check the consent state before proceeding. This is implemented as a pipeline gate — a mandatory step that runs before any transformation or loading.

"""
consent_gate.py — NDPR Consent Enforcement for Data Pipelines

This module provides a pipeline gate that verifies consent status
before allowing personal data to flow through processing stages.
"""

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging

import psycopg2
from psycopg2.extras import RealDictCursor

logger = logging.getLogger(__name__)


class ConsentStatus(Enum):
    GRANTED = "grant"
    REVOKED = "revoke"
    EXPIRED = "expired"
    NOT_FOUND = "not_found"


@dataclass
class ConsentCheckResult:
    subject_id: str
    consent_type: str
    status: ConsentStatus
    checked_at: datetime
    purpose: Optional[str] = None
    expires_at: Optional[datetime] = None


class NDPRConsentGate:
    """
    Pipeline gate enforcing NDPR consent requirements.

    Checks consent status for each data subject before allowing
    their data to proceed through the pipeline. Records all checks
    in an audit log for regulatory evidence.
    """

    def __init__(self, db_connection_string: str):
        self.conn = psycopg2.connect(db_connection_string)
        self._ensure_audit_table()

    def _ensure_audit_table(self):
        """Create the consent check audit table if it does not exist."""
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS consent_check_audit (
                    check_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    pipeline_name   VARCHAR(100) NOT NULL,
                    pipeline_run_id VARCHAR(100) NOT NULL,
                    data_subject_id UUID NOT NULL,
                    consent_type    VARCHAR(50) NOT NULL,
                    check_result    VARCHAR(20) NOT NULL,
                    action_taken    VARCHAR(30) NOT NULL,
                    checked_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
                );
            """)
            self.conn.commit()

    def check_consent(
        self,
        subject_id: str,
        consent_type: str,
        pipeline_name: str,
        pipeline_run_id: str,
    ) -> ConsentCheckResult:
        """
        Check whether a data subject has active consent for the specified type.

        Returns ConsentCheckResult and logs the check to the audit table.
        NDPR Article 2.3 requires demonstrable consent — this audit trail
        provides that evidence.
        """
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("""
                SELECT action, purpose, expires_at, event_timestamp
                FROM consent_events
                WHERE data_subject_id = %s
                  AND consent_type = %s
                ORDER BY event_timestamp DESC
                LIMIT 1;
            """, (subject_id, consent_type))

            row = cur.fetchone()
            now = datetime.utcnow()

            if row is None:
                status = ConsentStatus.NOT_FOUND
            elif row["action"] == "revoke":
                status = ConsentStatus.REVOKED
            elif row["expires_at"] and row["expires_at"] < now:
                status = ConsentStatus.EXPIRED
            else:
                status = ConsentStatus.GRANTED

            result = ConsentCheckResult(
                subject_id=subject_id,
                consent_type=consent_type,
                status=status,
                checked_at=now,
                purpose=row["purpose"] if row else None,
                expires_at=row["expires_at"] if row else None,
            )

            # Determine action based on consent status
            action = "allow" if status == ConsentStatus.GRANTED else "block"

            # Record the check in audit log
            cur.execute("""
                INSERT INTO consent_check_audit
                (pipeline_name, pipeline_run_id, data_subject_id,
                 consent_type, check_result, action_taken)
                VALUES (%s, %s, %s, %s, %s, %s);
            """, (
                pipeline_name, pipeline_run_id, subject_id,
                consent_type, status.value, action,
            ))
            self.conn.commit()

            if status != ConsentStatus.GRANTED:
                logger.warning(
                    "Consent check FAILED for subject=%s type=%s status=%s pipeline=%s",
                    subject_id, consent_type, status.value, pipeline_name,
                )

            return result

    def filter_batch(
        self,
        subject_ids: list[str],
        consent_type: str,
        pipeline_name: str,
        pipeline_run_id: str,
    ) -> list[str]:
        """
        Filter a batch of subject IDs, returning only those with active consent.

        Optimised for pipeline batch processing — performs a single query
        rather than N individual checks.
        """
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("""
                SELECT DISTINCT ON (data_subject_id)
                    data_subject_id,
                    action,
                    expires_at
                FROM consent_events
                WHERE data_subject_id = ANY(%s)
                  AND consent_type = %s
                ORDER BY data_subject_id, event_timestamp DESC;
            """, (subject_ids, consent_type))

            rows = {str(r["data_subject_id"]): r for r in cur.fetchall()}
            now = datetime.utcnow()

            allowed = []
            blocked = []

            for sid in subject_ids:
                row = rows.get(sid)
                if (
                    row
                    and row["action"] == "grant"
                    and (not row["expires_at"] or row["expires_at"] > now)
                ):
                    allowed.append(sid)
                else:
                    blocked.append(sid)

            # Bulk audit log insert
            audit_records = [
                (pipeline_name, pipeline_run_id, sid, consent_type, "grant", "allow")
                for sid in allowed
            ] + [
                (pipeline_name, pipeline_run_id, sid, consent_type, "blocked", "block")
                for sid in blocked
            ]

            from psycopg2.extras import execute_values
            execute_values(cur, """
                INSERT INTO consent_check_audit
                (pipeline_name, pipeline_run_id, data_subject_id,
                 consent_type, check_result, action_taken)
                VALUES %s;
            """, audit_records)
            self.conn.commit()

            logger.info(
                "Batch consent check: %d allowed, %d blocked for type=%s pipeline=%s",
                len(allowed), len(blocked), consent_type, pipeline_name,
            )
            return allowed

Automated Data Retention and Deletion

NDPR/NDPA requires that personal data not be retained longer than necessary for its stated purpose. Manual deletion processes are error-prone and unauditable. Instead, implement automated retention policies that execute on schedule.

"""
retention_manager.py — Automated NDPR-Compliant Data Retention

Implements scheduled data retention enforcement with full audit trail.
Supports both hard delete and anonymisation strategies depending on
whether aggregate data must be preserved.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class RetentionAction(Enum):
    HARD_DELETE = "hard_delete"
    ANONYMIZE = "anonymize"
    ARCHIVE = "archive"


@dataclass
class RetentionPolicy:
    """Defines a retention policy for a specific data category."""
    policy_name: str
    table_name: str
    data_category: str           # 'personal', 'financial', 'health', 'telecom'
    retention_days: int
    action: RetentionAction
    timestamp_column: str        # Column used to determine data age
    subject_id_column: str       # Column identifying the data subject
    regulatory_basis: str        # e.g., 'NDPR Art 2.1(c)', 'CBN AML Reg 12.3'
    anonymize_columns: Optional[list[str]] = None  # Columns to anonymise
    exempt_conditions: Optional[str] = None  # SQL WHERE clause for exemptions


# Define retention policies per regulatory requirement
RETENTION_POLICIES = [
    RetentionPolicy(
        policy_name="ndpr_marketing_consent",
        table_name="marketing_contacts",
        data_category="personal",
        retention_days=365,
        action=RetentionAction.HARD_DELETE,
        timestamp_column="last_consent_date",
        subject_id_column="contact_id",
        regulatory_basis="NDPA Part III, Sec 28 — Storage Limitation",
    ),
    RetentionPolicy(
        policy_name="cbn_transaction_records",
        table_name="transaction_history",
        data_category="financial",
        retention_days=2555,  # 7 years per CBN AML/CFT Regulations
        action=RetentionAction.ARCHIVE,
        timestamp_column="transaction_date",
        subject_id_column="customer_id",
        regulatory_basis="CBN AML/CFT Reg 2022, Sec 12.3",
    ),
    RetentionPolicy(
        policy_name="ncc_cdr_records",
        table_name="call_detail_records",
        data_category="telecom",
        retention_days=1825,  # 5 years per NCC requirements
        action=RetentionAction.ANONYMIZE,
        timestamp_column="call_timestamp",
        subject_id_column="subscriber_msisdn",
        regulatory_basis="NCC Consumer Code of Practice, Sec 8",
        anonymize_columns=["subscriber_msisdn", "caller_name", "location_data"],
    ),
    RetentionPolicy(
        policy_name="ndpr_analytics_data",
        table_name="user_behavior_events",
        data_category="personal",
        retention_days=180,
        action=RetentionAction.ANONYMIZE,
        timestamp_column="event_timestamp",
        subject_id_column="user_id",
        regulatory_basis="NDPA Part III, Sec 28 — Storage Limitation",
        anonymize_columns=["user_id", "ip_address", "device_id", "email"],
    ),
]


class RetentionManager:
    """Execute and audit data retention policies."""

    def __init__(self, db_connection):
        self.conn = db_connection

    def execute_policy(self, policy: RetentionPolicy, dry_run: bool = False) -> dict:
        """
        Execute a single retention policy.

        Returns a summary of actions taken (or that would be taken in dry_run mode).
        All actions are recorded in the retention_audit_log table.
        """
        cutoff_date = datetime.utcnow() - timedelta(days=policy.retention_days)

        with self.conn.cursor() as cur:
            # Count affected records
            count_sql = f"""
                SELECT COUNT(*) FROM {policy.table_name}
                WHERE {policy.timestamp_column} < %s
                {f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
            """
            cur.execute(count_sql, (cutoff_date,))
            affected_count = cur.fetchone()[0]

            if dry_run:
                logger.info(
                    "[DRY RUN] Policy '%s': %d records would be affected (action: %s)",
                    policy.policy_name, affected_count, policy.action.value,
                )
                return {"policy": policy.policy_name, "affected": affected_count, "executed": False}

            if affected_count == 0:
                logger.info("Policy '%s': No records to process.", policy.policy_name)
                return {"policy": policy.policy_name, "affected": 0, "executed": True}

            # Execute the retention action
            if policy.action == RetentionAction.HARD_DELETE:
                action_sql = f"""
                    DELETE FROM {policy.table_name}
                    WHERE {policy.timestamp_column} < %s
                    {f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
                """
                cur.execute(action_sql, (cutoff_date,))

            elif policy.action == RetentionAction.ANONYMIZE:
                set_clauses = ", ".join(
                    f"{col} = 'ANONYMIZED'" for col in (policy.anonymize_columns or [])
                )
                if set_clauses:
                    action_sql = f"""
                        UPDATE {policy.table_name}
                        SET {set_clauses}
                        WHERE {policy.timestamp_column} < %s
                        {f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''}
                    """
                    cur.execute(action_sql, (cutoff_date,))

            elif policy.action == RetentionAction.ARCHIVE:
                # Move to archive table, then delete from source
                archive_table = f"{policy.table_name}_archive"
                action_sql = f"""
                    INSERT INTO {archive_table}
                    SELECT * FROM {policy.table_name}
                    WHERE {policy.timestamp_column} < %s
                    {f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''};

                    DELETE FROM {policy.table_name}
                    WHERE {policy.timestamp_column} < %s
                    {f'AND NOT ({policy.exempt_conditions})' if policy.exempt_conditions else ''};
                """
                cur.execute(action_sql, (cutoff_date, cutoff_date))

            # Record in audit log
            cur.execute("""
                INSERT INTO retention_audit_log
                (policy_name, table_name, action, records_affected,
                 cutoff_date, regulatory_basis, executed_at)
                VALUES (%s, %s, %s, %s, %s, %s, NOW());
            """, (
                policy.policy_name, policy.table_name,
                policy.action.value, affected_count,
                cutoff_date, policy.regulatory_basis,
            ))
            self.conn.commit()

            logger.info(
                "Policy '%s': %d records processed (action: %s)",
                policy.policy_name, affected_count, policy.action.value,
            )
            return {"policy": policy.policy_name, "affected": affected_count, "executed": True}

Right to Erasure Implementation

NDPA Section 37 grants data subjects the right to request erasure of their personal data. In a modern data stack with multiple downstream systems, this requires cascading erasure across all systems that hold the subject's data.

┌──────────────────────────────────────────────────────────────────────────┐
│                    RIGHT TO ERASURE FLOW                                 │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  1. REQUEST          2. DISCOVERY         3. EXECUTION    4. VERIFY     │
│  ──────────          ────────────         ────────────    ──────────    │
│                                                                          │
│  ┌─────────┐    ┌──────────────┐    ┌──────────────┐  ┌────────────┐   │
│  │ Subject │───>│ Data Map     │───>│ Erasure      │─>│ Completion │   │
│  │ Request │    │ Discovery    │    │ Orchestrator │  │ Verifier   │   │
│  └─────────┘    └──────┬───────┘    └──────┬───────┘  └────────────┘   │
│                        │                   │                            │
│                        v                   v                            │
│                 ┌──────────────┐    ┌──────────────┐                    │
│                 │ Systems:     │    │ Actions:     │                    │
│                 │ - PostgreSQL │    │ - DELETE     │                    │
│                 │ - Data Lake  │    │ - Anonymise  │                    │
│                 │ - Redis      │    │ - Overwrite  │                    │
│                 │ - Elastic    │    │ - Purge keys │                    │
│                 │ - Backups    │    │ - Redact     │                    │
│                 │ - 3rd Party  │    │ - Notify 3P  │                    │
│                 └──────────────┘    └──────────────┘                    │
│                                                                          │
│  SLA: 30 days per NDPA (72 hours recommended for automated systems)     │
└──────────────────────────────────────────────────────────────────────────┘
"""
erasure_orchestrator.py — NDPA Right to Erasure Orchestration

Coordinates erasure across all systems that hold personal data for a
given data subject. Maintains a complete audit trail of all actions taken.
"""

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Callable
import logging
import uuid

logger = logging.getLogger(__name__)


class ErasureStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    PARTIALLY_COMPLETED = "partially_completed"
    FAILED = "failed"


@dataclass
class ErasureTarget:
    """A single system or table from which data must be erased."""
    system_name: str
    description: str
    erasure_function: Callable
    is_critical: bool = True  # If True, failure blocks overall completion
    status: ErasureStatus = ErasureStatus.PENDING
    records_affected: int = 0
    error_message: str = ""
    completed_at: datetime = None


@dataclass
class ErasureRequest:
    """An erasure request from a data subject."""
    request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    data_subject_id: str = ""
    requested_at: datetime = field(default_factory=datetime.utcnow)
    reason: str = ""
    targets: list[ErasureTarget] = field(default_factory=list)
    status: ErasureStatus = ErasureStatus.PENDING
    completed_at: datetime = None


class ErasureOrchestrator:
    """
    Orchestrates right-to-erasure requests across all registered systems.

    Implements the NDPA Section 37 requirement with:
    - Comprehensive data discovery across all registered systems
    - Parallel erasure execution where possible
    - Verification of erasure completion
    - Full audit trail for regulatory evidence
    """

    def __init__(self, db_connection):
        self.conn = db_connection
        self.targets_registry: list[ErasureTarget] = []

    def register_target(self, target: ErasureTarget):
        """Register a system as containing personal data subject to erasure."""
        self.targets_registry.append(target)
        logger.info("Registered erasure target: %s", target.system_name)

    def execute_erasure(self, request: ErasureRequest) -> ErasureRequest:
        """
        Execute an erasure request across all registered targets.

        Processes each target, records results, and determines overall status.
        Per NDPA, we must complete within 30 days — but automated systems
        should target 72 hours or less.
        """
        request.status = ErasureStatus.IN_PROGRESS
        self._log_request_event(request, "erasure_started")

        all_succeeded = True
        any_succeeded = False

        for target in request.targets:
            try:
                target.status = ErasureStatus.IN_PROGRESS
                result = target.erasure_function(request.data_subject_id)
                target.records_affected = result.get("records_affected", 0)
                target.status = ErasureStatus.COMPLETED
                target.completed_at = datetime.utcnow()
                any_succeeded = True

                logger.info(
                    "Erasure completed: system=%s subject=%s records=%d",
                    target.system_name, request.data_subject_id, target.records_affected,
                )

            except Exception as e:
                target.status = ErasureStatus.FAILED
                target.error_message = str(e)
                if target.is_critical:
                    all_succeeded = False

                logger.error(
                    "Erasure FAILED: system=%s subject=%s error=%s",
                    target.system_name, request.data_subject_id, str(e),
                )

            self._log_target_result(request, target)

        # Determine overall status
        if all_succeeded:
            request.status = ErasureStatus.COMPLETED
        elif any_succeeded:
            request.status = ErasureStatus.PARTIALLY_COMPLETED
        else:
            request.status = ErasureStatus.FAILED

        request.completed_at = datetime.utcnow()
        self._log_request_event(request, "erasure_completed")

        return request

    def _log_request_event(self, request: ErasureRequest, event_type: str):
        """Log erasure request events to audit table."""
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO erasure_audit_log
                (request_id, data_subject_id, event_type, status, event_timestamp)
                VALUES (%s, %s, %s, %s, NOW());
            """, (request.request_id, request.data_subject_id, event_type, request.status.value))
            self.conn.commit()

    def _log_target_result(self, request: ErasureRequest, target: ErasureTarget):
        """Log individual target erasure results."""
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO erasure_target_audit
                (request_id, system_name, status, records_affected,
                 error_message, completed_at)
                VALUES (%s, %s, %s, %s, %s, %s);
            """, (
                request.request_id, target.system_name, target.status.value,
                target.records_affected, target.error_message, target.completed_at,
            ))
            self.conn.commit()

Data Protection Impact Assessment Automation

NDPA Section 31 requires a Data Protection Impact Assessment (DPIA) for processing activities that are "likely to result in a high risk to the rights and freedoms of data subjects." Rather than treating this as a one-off paper exercise, implement automated DPIA workflows that trigger when new data processing activities are registered.

# dpia_policy.yaml — Automated DPIA Trigger Configuration
#
# Defines conditions under which a DPIA is automatically required.
# Based on NDPA Part III, Sec 31 and NDPC Implementation Framework.

dpia_triggers:
  automatic_required:
    - condition: "data_category IN ('health', 'biometric', 'genetic')"
      reason: "Processing of special category data"
      ndpa_reference: "Part III, Sec 30(1)(a)"

    - condition: "processing_scale == 'large' AND data_category == 'personal'"
      reason: "Large-scale processing of personal data"
      ndpa_reference: "Part III, Sec 30(1)(b)"

    - condition: "involves_profiling == true"
      reason: "Systematic profiling with legal or significant effects"
      ndpa_reference: "Part III, Sec 30(1)(c)"

    - condition: "cross_border_transfer == true AND destination NOT IN approved_jurisdictions"
      reason: "Transfer to jurisdiction without adequate protection"
      ndpa_reference: "Part VI, Sec 47"

    - condition: "new_technology == true AND data_category == 'personal'"
      reason: "Use of new technology for personal data processing"
      ndpa_reference: "Part III, Sec 30(1)(d)"

  risk_scoring:
    high_risk_threshold: 12
    factors:
      - name: "data_sensitivity"
        weight: 3
        scores:
          public: 1
          personal: 2
          sensitive: 4
          special_category: 5

      - name: "processing_scale"
        weight: 2
        scores:
          individual: 1
          small: 2
          medium: 3
          large: 4
          national: 5

      - name: "data_subject_vulnerability"
        weight: 2
        scores:
          general_public: 1
          employees: 2
          customers: 3
          children: 5
          patients: 5

  approved_jurisdictions:
    - "NG"  # Nigeria
    - "GH"  # Ghana (adequate protection)
    - "ZA"  # South Africa (POPIA)
    - "EU"  # European Union (GDPR — mutual adequacy)
    - "GB"  # United Kingdom (UK GDPR)
    - "CA"  # Canada (PIPEDA)
    # Note: USA does not have federal adequacy — requires additional safeguards

CBN Data Requirements for Financial Institutions

Financial institutions in Nigeria face the most demanding regulatory data requirements. The CBN expects not only periodic returns but increasingly real-time or near-real-time access to transaction data. Let us build the infrastructure to meet these requirements.

Transaction Reporting Pipeline

┌──────────────────────────────────────────────────────────────────────────┐
│              CBN TRANSACTION REPORTING PIPELINE                          │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌──────────┐   ┌─────────────┐   ┌──────────────┐   ┌─────────────┐  │
│  │ Core     │──>│ CDC / Event │──>│ Stream       │──>│ Compliance  │  │
│  │ Banking  │   │ Stream      │   │ Processing   │   │ Data Store  │  │
│  │ System   │   │ (Debezium)  │   │ (Flink/Spark)│   │ (Warehouse) │  │
│  └──────────┘   └─────────────┘   └──────┬───────┘   └──────┬──────┘  │
│                                          │                   │          │
│                    ┌─────────────────────┘                   │          │
│                    │                                         │          │
│                    v                                         v          │
│            ┌──────────────┐                       ┌──────────────┐     │
│            │ Real-time    │                       │ Scheduled    │     │
│            │ Alerts       │                       │ Returns      │     │
│            │ ─────────    │                       │ ──────────   │     │
│            │ STR Triggers │                       │ Monthly Form │     │
│            │ CTR Triggers │                       │ Credit Bureau│     │
│            │ AML Patterns │                       │ FX Returns   │     │
│            │ Sanctions    │                       │ Fin Stability│     │
│            └──────────────┘                       └──────────────┘     │
│                    │                                         │          │
│                    v                                         v          │
│            ┌──────────────┐                       ┌──────────────┐     │
│            │ Compliance   │                       │ CBN Portal   │     │
│            │ Officer      │                       │ Submission   │     │
│            │ Dashboard    │                       │ (Automated)  │     │
│            └──────────────┘                       └──────────────┘     │
│                                                                          │
│  All flows instrumented with: data lineage, audit trail, checksums      │
└──────────────────────────────────────────────────────────────────────────┘

AML/KYC Data Flow Implementation

CBN's AML/CFT Regulations 2022 require financial institutions to maintain comprehensive KYC records and flag suspicious transactions. Here is a data model and detection pipeline:

-- KYC/CDD data model aligned with CBN AML/CFT Regulations 2022
CREATE TABLE kyc_records (
    customer_id         UUID PRIMARY KEY,
    kyc_tier            VARCHAR(10) NOT NULL, -- 'tier1', 'tier2', 'tier3' per CBN tiered KYC
    bvn                 VARCHAR(11),          -- Bank Verification Number
    nin                 VARCHAR(11),          -- National Identification Number
    full_name           VARCHAR(200) NOT NULL,
    date_of_birth       DATE,
    nationality         VARCHAR(3),
    address             TEXT,
    phone_number        VARCHAR(20),
    email               VARCHAR(100),
    id_document_type    VARCHAR(30),          -- 'national_id', 'passport', 'drivers_licence', 'voters_card'
    id_document_number  VARCHAR(50),
    id_verified         BOOLEAN DEFAULT FALSE,
    id_verified_at      TIMESTAMPTZ,
    risk_rating         VARCHAR(10) NOT NULL, -- 'low', 'medium', 'high'
    pep_status          BOOLEAN DEFAULT FALSE, -- Politically Exposed Person
    sanctions_checked   BOOLEAN DEFAULT FALSE,
    sanctions_checked_at TIMESTAMPTZ,
    cdd_completed_at    TIMESTAMPTZ,
    edd_required        BOOLEAN DEFAULT FALSE, -- Enhanced Due Diligence
    edd_completed_at    TIMESTAMPTZ,
    next_review_date    DATE,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Transaction monitoring for STR/CTR
CREATE TABLE transaction_monitoring (
    txn_id              UUID PRIMARY KEY,
    customer_id         UUID NOT NULL REFERENCES kyc_records(customer_id),
    txn_date            TIMESTAMPTZ NOT NULL,
    txn_type            VARCHAR(30) NOT NULL,  -- 'transfer', 'deposit', 'withdrawal', 'fx_purchase'
    txn_amount          NUMERIC(18,2) NOT NULL,
    txn_currency        VARCHAR(3) NOT NULL DEFAULT 'NGN',
    counterparty_name   VARCHAR(200),
    counterparty_acct   VARCHAR(30),
    counterparty_bank   VARCHAR(100),
    channel             VARCHAR(20),           -- 'branch', 'atm', 'pos', 'mobile', 'internet'
    narrative           TEXT,
    risk_score          NUMERIC(5,2),          -- Calculated by ML model
    alert_generated     BOOLEAN DEFAULT FALSE,
    alert_type          VARCHAR(30),           -- 'ctr', 'str', 'pattern', 'sanctions'
    reviewed_by         VARCHAR(100),
    review_outcome      VARCHAR(30),           -- 'cleared', 'escalated', 'filed_str'
    reviewed_at         TIMESTAMPTZ,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Currency Transaction Report (CTR) — auto-generated for transactions above threshold
CREATE VIEW ctr_candidates AS
SELECT
    tm.txn_id,
    tm.customer_id,
    kr.full_name,
    kr.bvn,
    tm.txn_date,
    tm.txn_type,
    tm.txn_amount,
    tm.txn_currency,
    tm.channel,
    tm.narrative,
    CASE
        WHEN kr.kyc_tier = 'tier1' AND tm.txn_amount > 300000 THEN 'OVER_TIER1_LIMIT'
        WHEN kr.kyc_tier = 'tier2' AND tm.txn_amount > 500000 THEN 'OVER_TIER2_LIMIT'
        WHEN tm.txn_amount > 5000000 THEN 'INDIVIDUAL_CTR_THRESHOLD'
        WHEN tm.txn_amount > 10000000 THEN 'CORPORATE_CTR_THRESHOLD'
    END AS ctr_reason
FROM transaction_monitoring tm
JOIN kyc_records kr ON tm.customer_id = kr.customer_id
WHERE tm.txn_amount > 300000  -- Lowest relevant threshold
  AND tm.txn_date >= CURRENT_DATE - INTERVAL '1 day';

-- Suspicious pattern detection: structuring (smurfing)
CREATE VIEW structuring_alerts AS
SELECT
    customer_id,
    txn_date::DATE AS txn_day,
    COUNT(*) AS txn_count,
    SUM(txn_amount) AS total_amount,
    AVG(txn_amount) AS avg_amount,
    MAX(txn_amount) AS max_amount,
    MIN(txn_amount) AS min_amount,
    array_agg(DISTINCT channel) AS channels_used
FROM transaction_monitoring
WHERE txn_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY customer_id, txn_date::DATE
HAVING
    -- Multiple transactions just below CTR threshold
    (COUNT(*) >= 3 AND MAX(txn_amount) < 5000000 AND SUM(txn_amount) > 5000000)
    OR
    -- Rapid-fire transactions in short window
    (COUNT(*) >= 5 AND SUM(txn_amount) > 2000000)
ORDER BY total_amount DESC;

Automated CBN Returns Generation

"""
cbn_returns.py — Automated CBN Regulatory Returns Generation

Generates, validates, and prepares CBN returns for submission.
Includes reconciliation checks to ensure data accuracy before filing.
"""

from dataclasses import dataclass
from datetime import datetime, date
from typing import Optional
import logging
import hashlib

logger = logging.getLogger(__name__)


@dataclass
class CBNReturn:
    """Represents a single CBN regulatory return."""
    return_type: str           # 'monthly_form_m', 'credit_bureau', 'fx_returns', etc.
    reporting_period: str      # '2024-04', '2024-Q1', etc.
    institution_code: str      # CBN institution code
    generated_at: datetime = None
    record_count: int = 0
    checksum: str = ""
    validation_passed: bool = False
    validation_errors: list = None
    file_path: str = ""


class CBNReturnsGenerator:
    """
    Generates CBN regulatory returns from the compliance data warehouse.

    Each return type has:
    - A SQL query to extract the required data
    - Validation rules matching CBN format specifications
    - Reconciliation checks against source systems
    - Checksum generation for integrity verification
    """

    def __init__(self, warehouse_connection, source_connection):
        self.warehouse = warehouse_connection
        self.source = source_connection

    def generate_monthly_returns(self, reporting_month: str, institution_code: str) -> CBNReturn:
        """
        Generate CBN Monthly Returns (Form M).

        Extracts balance sheet, P&L, and asset quality data from the warehouse,
        validates against CBN format specifications, and reconciles with source.
        """
        cbr = CBNReturn(
            return_type="monthly_form_m",
            reporting_period=reporting_month,
            institution_code=institution_code,
            generated_at=datetime.utcnow(),
        )

        with self.warehouse.cursor() as cur:
            # Extract balance sheet data
            cur.execute("""
                SELECT
                    account_category,
                    account_subcategory,
                    cbn_line_item_code,
                    SUM(balance_ngn) AS balance,
                    COUNT(DISTINCT account_id) AS account_count
                FROM regulatory_balance_sheet
                WHERE reporting_month = %s
                  AND institution_code = %s
                GROUP BY account_category, account_subcategory, cbn_line_item_code
                ORDER BY cbn_line_item_code;
            """, (reporting_month, institution_code))

            rows = cur.fetchall()
            cbr.record_count = len(rows)

        # Validate
        cbr.validation_errors = self._validate_monthly_returns(rows, reporting_month)
        cbr.validation_passed = len(cbr.validation_errors) == 0

        # Reconciliation check
        reconciliation = self._reconcile_with_source(reporting_month, institution_code)
        if not reconciliation["balanced"]:
            cbr.validation_errors.append(
                f"Reconciliation variance: {reconciliation['variance_ngn']:,.2f} NGN"
            )
            cbr.validation_passed = False

        # Generate checksum
        content = str(rows).encode("utf-8")
        cbr.checksum = hashlib.sha256(content).hexdigest()

        if cbr.validation_passed:
            logger.info(
                "Monthly returns generated successfully: period=%s records=%d checksum=%s",
                reporting_month, cbr.record_count, cbr.checksum[:16],
            )
        else:
            logger.warning(
                "Monthly returns generated with errors: period=%s errors=%d",
                reporting_month, len(cbr.validation_errors),
            )

        return cbr

    def _validate_monthly_returns(self, rows: list, reporting_month: str) -> list[str]:
        """Validate monthly returns against CBN format specifications."""
        errors = []

        # Check required line items are present
        required_codes = [
            "BS001", "BS002", "BS003",  # Assets
            "BS101", "BS102", "BS103",  # Liabilities
            "PL001", "PL002", "PL003",  # Income
            "PL101", "PL102",           # Expenses
        ]
        present_codes = {row[2] for row in rows}  # cbn_line_item_code
        missing = set(required_codes) - present_codes
        if missing:
            errors.append(f"Missing required CBN line items: {missing}")

        # Check total assets = total liabilities + equity
        # (simplified — real implementation would sum specific line items)

        return errors

    def _reconcile_with_source(self, reporting_month: str, institution_code: str) -> dict:
        """
        Reconcile warehouse data against source core banking system.

        This is critical: CBN auditors will check whether your returns
        match your actual books. Any unexplained variance is a red flag.
        """
        with self.warehouse.cursor() as cur:
            cur.execute("""
                SELECT SUM(balance_ngn) AS warehouse_total
                FROM regulatory_balance_sheet
                WHERE reporting_month = %s AND institution_code = %s;
            """, (reporting_month, institution_code))
            warehouse_total = cur.fetchone()[0] or 0

        with self.source.cursor() as cur:
            cur.execute("""
                SELECT SUM(balance_lcy) AS source_total
                FROM gl_balances
                WHERE period = %s;
            """, (reporting_month,))
            source_total = cur.fetchone()[0] or 0

        variance = abs(warehouse_total - source_total)
        # Tolerance: allow NGN 1.00 for rounding
        balanced = variance <= 1.00

        return {
            "warehouse_total": warehouse_total,
            "source_total": source_total,
            "variance_ngn": variance,
            "balanced": balanced,
        }

Automating Regulatory Reporting with Data Engineering

Manual regulatory reporting is the single biggest compliance risk for Nigerian organisations. Not because people are careless, but because manual processes do not scale, do not self-verify, and do not leave adequate audit trails. Let us build the automation layer.

Compliance Pipeline Architecture

┌──────────────────────────────────────────────────────────────────────────────┐
│                 COMPLIANCE PIPELINE ARCHITECTURE                             │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  SOURCE SYSTEMS          INGESTION           COMPLIANCE WAREHOUSE            │
│  ──────────────          ─────────           ────────────────────            │
│                                                                              │
│  ┌──────────┐     ┌──────────────┐     ┌────────────────────────┐           │
│  │Core Bank │────>│              │     │  RAW ZONE              │           │
│  └──────────┘     │              │     │  - Immutable landing   │           │
│  ┌──────────┐     │  CDC /       │     │  - Full audit trail    │           │
│  │CRM       │────>│  Batch       │────>│  - Source checksums    │           │
│  └──────────┘     │  Ingestion   │     └────────┬───────────────┘           │
│  ┌──────────┐     │              │              │                            │
│  │Mobile App│────>│              │              v                            │
│  └──────────┘     └──────────────┘     ┌────────────────────────┐           │
│  ┌──────────┐                          │  CONFORMED ZONE         │           │
│  │3rd Party │──────────────────────────│  - Standardised schemas│           │
│  │APIs      │                          │  - PII tagged/masked   │           │
│  └──────────┘                          │  - Consent-gated       │           │
│                                        └────────┬───────────────┘           │
│                                                  │                           │
│                               ┌──────────────────┼───────────────┐          │
│                               │                  │               │          │
│                               v                  v               v          │
│                     ┌──────────────┐  ┌──────────────┐ ┌──────────────┐    │
│                     │ NDPR         │  │ CBN          │ │ NCC          │    │
│                     │ Compliance   │  │ Returns      │ │ Reports      │    │
│                     │ ──────────   │  │ ──────────   │ │ ──────────   │    │
│                     │ Consent logs │  │ Form M       │ │ QoS metrics  │    │
│                     │ DPIA records │  │ STR/CTR      │ │ Subscriber   │    │
│                     │ Erasure logs │  │ Credit Bureau │ │ Complaints   │    │
│                     │ Breach data  │  │ FX Returns   │ │ SIM data     │    │
│                     └──────────────┘  └──────────────┘ └──────────────┘    │
│                               │                  │               │          │
│                               └──────────────────┼───────────────┘          │
│                                                  │                           │
│                                                  v                           │
│                                        ┌──────────────────┐                 │
│                                        │ AUDIT & LINEAGE  │                 │
│                                        │ ────────────────  │                 │
│                                        │ - Data lineage   │                 │
│                                        │ - Transformation │                 │
│                                        │   history        │                 │
│                                        │ - Access logs    │                 │
│                                        │ - Quality scores │                 │
│                                        └──────────────────┘                 │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Data Lineage for Regulatory Evidence

When a regulator asks "where did this number come from?", you need to answer in seconds, not days. Data lineage tracking captures the complete transformation history of every data point in your compliance warehouse.

-- Data lineage tracking schema
CREATE TABLE data_lineage_events (
    lineage_id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_name       VARCHAR(100) NOT NULL,
    pipeline_run_id     VARCHAR(100) NOT NULL,
    step_name           VARCHAR(100) NOT NULL,
    step_order          INT NOT NULL,

    -- Source information
    source_system       VARCHAR(100) NOT NULL,
    source_table        VARCHAR(200),
    source_query_hash   VARCHAR(64),         -- SHA-256 of the extraction query
    source_row_count    BIGINT,
    source_checksum     VARCHAR(64),         -- Checksum of source data

    -- Destination information
    destination_table   VARCHAR(200) NOT NULL,
    destination_row_count BIGINT,
    destination_checksum  VARCHAR(64),

    -- Transformation information
    transformation_type VARCHAR(50),          -- 'extract', 'transform', 'load', 'validate'
    transformation_desc TEXT,
    transformation_code_hash VARCHAR(64),     -- Hash of the transformation code version

    -- Row-level tracking
    rows_inserted       BIGINT DEFAULT 0,
    rows_updated        BIGINT DEFAULT 0,
    rows_deleted        BIGINT DEFAULT 0,
    rows_rejected       BIGINT DEFAULT 0,

    -- Quality metrics
    null_count          BIGINT DEFAULT 0,
    duplicate_count     BIGINT DEFAULT 0,
    validation_errors   JSONB,

    -- Timing
    started_at          TIMESTAMPTZ NOT NULL,
    completed_at        TIMESTAMPTZ,
    duration_ms         BIGINT,

    -- Context
    triggered_by        VARCHAR(100),         -- 'schedule', 'manual', 'event'
    git_commit_hash     VARCHAR(40),          -- Code version that produced this
    environment         VARCHAR(20),          -- 'production', 'staging'

    created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Index for tracing a specific record's lineage
CREATE INDEX idx_lineage_pipeline_run
ON data_lineage_events (pipeline_run_id, step_order);

CREATE INDEX idx_lineage_destination
ON data_lineage_events (destination_table, created_at DESC);

-- View: Trace lineage from a regulatory report back to source
CREATE VIEW regulatory_report_lineage AS
SELECT
    dle.pipeline_name,
    dle.pipeline_run_id,
    dle.step_name,
    dle.step_order,
    dle.source_system,
    dle.source_table,
    dle.destination_table,
    dle.transformation_desc,
    dle.source_row_count,
    dle.destination_row_count,
    dle.rows_rejected,
    dle.source_checksum,
    dle.destination_checksum,
    dle.started_at,
    dle.completed_at,
    dle.git_commit_hash
FROM data_lineage_events dle
WHERE dle.destination_table LIKE 'regulatory_%'
ORDER BY dle.pipeline_run_id, dle.step_order;

Reconciliation Framework

Every regulatory report must be reconciled against source systems before submission. Discrepancies must be investigated, documented, and either resolved or explained.

"""
reconciliation.py — Automated Reconciliation for Regulatory Reports

Implements multi-level reconciliation checks:
1. Row count reconciliation (source vs destination)
2. Aggregate reconciliation (sum/count/avg checks)
3. Hash-based reconciliation (row-level integrity)
4. Cross-system reconciliation (warehouse vs source)
"""

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging
import hashlib

logger = logging.getLogger(__name__)


class ReconciliationStatus(Enum):
    PASS = "pass"
    FAIL = "fail"
    WARNING = "warning"  # Within tolerance but not exact


@dataclass
class ReconciliationCheck:
    check_name: str
    check_type: str       # 'row_count', 'aggregate', 'hash', 'cross_system'
    source_value: float
    target_value: float
    tolerance: float      # Absolute tolerance (e.g., 1.0 for rounding)
    tolerance_pct: float  # Percentage tolerance (e.g., 0.001 for 0.1%)
    status: ReconciliationStatus = ReconciliationStatus.PASS
    variance: float = 0.0
    variance_pct: float = 0.0
    notes: str = ""


@dataclass
class ReconciliationReport:
    report_name: str
    reporting_period: str
    checks: list[ReconciliationCheck]
    overall_status: ReconciliationStatus = ReconciliationStatus.PASS
    generated_at: datetime = None
    signed_off_by: Optional[str] = None
    signed_off_at: Optional[datetime] = None


class ReconciliationEngine:
    """
    Automated reconciliation engine for regulatory reports.

    Runs configurable checks and produces an auditable reconciliation
    report that can be attached to regulatory submissions as evidence
    of data accuracy.
    """

    def __init__(self, db_connection):
        self.conn = db_connection

    def run_reconciliation(
        self,
        report_name: str,
        reporting_period: str,
        checks: list[dict],
    ) -> ReconciliationReport:
        """
        Run a set of reconciliation checks and produce a report.

        Each check dict contains:
        - name: Human-readable check name
        - type: 'row_count', 'aggregate', 'hash', 'cross_system'
        - source_query: SQL to get source value
        - target_query: SQL to get target value
        - tolerance: Absolute tolerance
        - tolerance_pct: Percentage tolerance
        """
        results = []

        for check_config in checks:
            with self.conn.cursor() as cur:
                cur.execute(check_config["source_query"], (reporting_period,))
                source_val = cur.fetchone()[0] or 0

                cur.execute(check_config["target_query"], (reporting_period,))
                target_val = cur.fetchone()[0] or 0

            variance = abs(source_val - target_val)
            variance_pct = (variance / source_val * 100) if source_val != 0 else 0

            tolerance = check_config.get("tolerance", 0)
            tolerance_pct = check_config.get("tolerance_pct", 0)

            if variance <= tolerance or variance_pct <= tolerance_pct:
                status = ReconciliationStatus.PASS
            elif variance_pct <= tolerance_pct * 2:
                status = ReconciliationStatus.WARNING
            else:
                status = ReconciliationStatus.FAIL

            results.append(ReconciliationCheck(
                check_name=check_config["name"],
                check_type=check_config["type"],
                source_value=source_val,
                target_value=target_val,
                tolerance=tolerance,
                tolerance_pct=tolerance_pct,
                status=status,
                variance=variance,
                variance_pct=variance_pct,
            ))

        # Determine overall status
        if any(c.status == ReconciliationStatus.FAIL for c in results):
            overall = ReconciliationStatus.FAIL
        elif any(c.status == ReconciliationStatus.WARNING for c in results):
            overall = ReconciliationStatus.WARNING
        else:
            overall = ReconciliationStatus.PASS

        report = ReconciliationReport(
            report_name=report_name,
            reporting_period=reporting_period,
            checks=results,
            overall_status=overall,
            generated_at=datetime.utcnow(),
        )

        # Persist reconciliation report for audit
        self._persist_report(report)

        if overall == ReconciliationStatus.FAIL:
            logger.error(
                "Reconciliation FAILED for %s period=%s — DO NOT SUBMIT",
                report_name, reporting_period,
            )
        else:
            logger.info(
                "Reconciliation %s for %s period=%s (%d checks passed)",
                overall.value, report_name, reporting_period,
                sum(1 for c in results if c.status == ReconciliationStatus.PASS),
            )

        return report

    def _persist_report(self, report: ReconciliationReport):
        """Save reconciliation report for audit trail."""
        with self.conn.cursor() as cur:
            for check in report.checks:
                cur.execute("""
                    INSERT INTO reconciliation_audit
                    (report_name, reporting_period, check_name, check_type,
                     source_value, target_value, variance, variance_pct,
                     status, generated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                """, (
                    report.report_name, report.reporting_period,
                    check.check_name, check.check_type,
                    check.source_value, check.target_value,
                    check.variance, check.variance_pct,
                    check.status.value, report.generated_at,
                ))
            self.conn.commit()

Scheduling and Orchestration

Regulatory returns have strict deadlines. An orchestration layer ensures returns are generated, validated, reconciled, and ready for submission well before due dates.

"""
compliance_scheduler.py — Regulatory Return Scheduling

Defines the schedule for all Nigerian regulatory returns with
appropriate lead times, escalation paths, and retry logic.
Compatible with Apache Airflow, Dagster, or Prefect.
"""

# Airflow DAG definition (conceptual — adapt to your orchestrator)
from datetime import timedelta

REGULATORY_SCHEDULE = {
    "cbn_monthly_returns": {
        "description": "CBN Monthly Returns (Form M)",
        "cron": "0 6 10 * *",           # 10th of each month at 6 AM WAT
        "deadline_day": 15,               # Must be submitted by 15th
        "lead_time_days": 5,              # Start generation 5 days before deadline
        "steps": [
            "extract_from_core_banking",
            "transform_to_cbn_format",
            "reconcile_with_source",
            "validate_cbn_format",
            "generate_submission_file",
            "notify_compliance_team",
        ],
        "escalation": {
            "warning_days_before_deadline": 3,
            "critical_days_before_deadline": 1,
            "notify": ["[email protected]", "[email protected]"],
        },
        "retry_policy": {
            "max_retries": 3,
            "retry_delay_minutes": 30,
        },
    },
    "cbn_daily_fx_returns": {
        "description": "CBN Daily Foreign Exchange Returns",
        "cron": "0 18 * * 1-5",          # 6 PM WAT on business days
        "deadline": "same_day",
        "steps": [
            "extract_fx_transactions",
            "validate_fx_rates",
            "reconcile_fx_positions",
            "generate_fx_return",
            "submit_to_cbn_portal",
        ],
        "escalation": {
            "notify_on_failure": ["[email protected]", "[email protected]"],
        },
        "retry_policy": {
            "max_retries": 5,
            "retry_delay_minutes": 15,
        },
    },
    "cbn_str_filing": {
        "description": "CBN Suspicious Transaction Report",
        "trigger": "event_based",         # Triggered by AML detection pipeline
        "deadline_hours": 72,             # Must file within 72 hours of detection
        "steps": [
            "enrich_transaction_context",
            "generate_str_narrative",
            "attach_supporting_docs",
            "route_to_compliance_officer",
            "file_with_nfiu",            # Nigerian Financial Intelligence Unit
        ],
        "escalation": {
            "warning_hours": 24,
            "critical_hours": 48,
            "notify": ["[email protected]", "[email protected]", "[email protected]"],
        },
    },
    "ndpr_annual_audit": {
        "description": "NDPR/NDPA Annual Data Protection Audit",
        "cron": "0 9 1 3 *",             # March 1st — start of audit preparation
        "deadline_month": 3,              # Q1 submission to NITDA/NDPC
        "steps": [
            "inventory_all_data_processing",
            "assess_lawful_basis_coverage",
            "verify_consent_records",
            "check_retention_compliance",
            "review_cross_border_transfers",
            "compile_dpia_records",
            "generate_audit_report",
            "engage_dpco",               # Data Protection Compliance Organisation
        ],
    },
    "credit_bureau_monthly": {
        "description": "Credit Bureau Returns (CRC, First Central, CreditRegistry)",
        "cron": "0 6 20 * *",            # 20th of each month
        "deadline_day": 30,
        "steps": [
            "extract_loan_portfolio",
            "map_to_bureau_format",
            "validate_bvn_matching",
            "reconcile_account_counts",
            "generate_bureau_files",
            "submit_to_bureaus",
        ],
    },
    "firs_vat_returns": {
        "description": "FIRS VAT Monthly Returns",
        "cron": "0 6 15 * *",            # 15th of each month
        "deadline_day": 21,
        "steps": [
            "extract_vat_transactions",
            "calculate_input_output_vat",
            "reconcile_with_gl",
            "generate_vat_return",
            "prepare_taxpro_max_upload",
        ],
    },
    "ncc_qos_quarterly": {
        "description": "NCC Quality of Service Quarterly Report",
        "cron": "0 6 15 1,4,7,10 *",     # 15th of quarter-start months
        "deadline_day": 30,               # 30 days after quarter end
        "steps": [
            "aggregate_network_metrics",
            "calculate_qos_kpis",
            "generate_ncc_format_report",
            "validate_against_thresholds",
            "submit_to_ncc",
        ],
    },
}

Data Residency and Sovereignty

Data residency is one of the most misunderstood aspects of Nigerian compliance. Many organisations assume that storing data "in the cloud" is sufficient, without considering where that cloud infrastructure is physically located or how data flows between regions.

Where Nigerian Data Must Live

The NDPA and NDPR establish clear principles on data residency:

  1. Primary copy in Nigeria: NDPR Implementation Framework 2020 (Section 4.2) requires that a copy of all personal data collected in Nigeria must be stored on a server or data centre physically located within Nigeria.

  2. Cross-border transfer conditions (NDPA Part VI, Sections 47-49):

    • Transfer to a country with "adequate level of protection" as determined by the NDPC
    • Transfer with explicit consent of the data subject
    • Transfer necessary for performance of a contract
    • Transfer with appropriate safeguards (binding corporate rules, standard contractual clauses)
  3. CBN requirements: The CBN has issued multiple circulars requiring financial institutions to maintain core banking systems and customer data within Nigeria. The CBN's Cloud Computing Framework allows use of cloud services but mandates that primary data centres for core banking operations be in Nigeria.

  4. NCC requirements: SIM registration data and subscriber identity data must be maintained within Nigeria and accessible to the NCC on demand.

Cloud Region Selection Strategy

┌──────────────────────────────────────────────────────────────────────────┐
│              CLOUD REGION STRATEGY FOR NIGERIAN COMPLIANCE               │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  PRIMARY REGION: Africa (Lagos or Cape Town)                            │
│  ─────────────────────────────────────────────                          │
│  AWS: af-south-1 (Cape Town)     — Closest AWS region                  │
│  Azure: South Africa North       — Johannesburg                        │
│  GCP: me-central2 (Dammam)      — Nearest GCP region                  │
│  Local: Rack Centre (Lagos), MainOne MDXi (Lagos)                      │
│                                                                          │
│  ┌─────────────────────┐                                                │
│  │  DATA CLASSIFICATION │                                               │
│  ├─────────────────────┤                                                │
│  │                     │                                                │
│  │  TIER 1: Nigeria-Only                                                │
│  │  ──────────────────                                                  │
│  │  - PII of Nigerian residents (NDPR)                                  │
│  │  - Core banking data (CBN)                                           │
│  │  - SIM registration data (NCC)                                       │
│  │  - Tax records (FIRS)                                                │
│  │  Storage: Nigerian data centre (Rack Centre / MainOne)               │
│  │                                                                      │
│  │  TIER 2: Africa Region OK                                            │
│  │  ────────────────────                                                │
│  │  - Aggregated analytics (no PII)                                     │
│  │  - Application logs (PII stripped)                                   │
│  │  - Infrastructure metrics                                            │
│  │  Storage: AWS af-south-1 / Azure South Africa North                  │
│  │                                                                      │
│  │  TIER 3: Global (with safeguards)                                    │
│  │  ────────────────────────────                                        │
│  │  - CDN cached content (non-personal)                                 │
│  │  - Open-source model inference (no PII in prompts)                   │
│  │  - Public marketing analytics                                        │
│  │  Storage: Any region, no PII                                         │
│  │                                                                      │
│  └─────────────────────┘                                                │
│                                                                          │
│  IMPORTANT: Even Tier 2/3 data may require Nigerian copy if it was      │
│  derived from personal data of Nigerian residents.                       │
└──────────────────────────────────────────────────────────────────────────┘

Encryption Requirements

Nigerian regulations mandate encryption at multiple levels:

# encryption_policy.yaml — Encryption Standards for Nigerian Compliance

encryption_requirements:
  at_rest:
    standard: "AES-256"
    key_management: "AWS KMS / Azure Key Vault with customer-managed keys"
    scope:
      - "All databases containing personal data"
      - "All file storage containing personal data"
      - "All backup volumes"
      - "All archive storage"
    regulatory_basis:
      - "NDPR Implementation Framework, Sec 2.5"
      - "CBN Risk-Based Cybersecurity Framework, Sec 4.3"

  in_transit:
    standard: "TLS 1.2 minimum, TLS 1.3 preferred"
    scope:
      - "All API communications"
      - "All database connections"
      - "All inter-service communication"
      - "All data pipeline transfers"
    certificate_management:
      provider: "Let's Encrypt / AWS ACM"
      rotation: "90 days automatic"
    regulatory_basis:
      - "NDPR Implementation Framework, Sec 2.5"
      - "CBN Guidelines on Electronic Banking"

  field_level:
    description: "Additional encryption for highly sensitive fields"
    algorithm: "AES-256-GCM with per-field keys"
    fields:
      - "bvn"                    # Bank Verification Number
      - "nin"                    # National Identification Number
      - "account_number"
      - "card_number"            # Must also comply with PCI DSS
      - "biometric_data"
      - "health_records"
    key_rotation: "Quarterly"
    regulatory_basis:
      - "NDPA Part III, Sec 29 — Security of Processing"
      - "CBN AML/CFT Regulations — Customer Data Protection"

  tokenization:
    description: "Replace sensitive values with non-reversible tokens for analytics"
    use_cases:
      - "BVN in analytics pipelines (tokenize, do not encrypt)"
      - "Phone numbers in aggregated reporting"
      - "Account numbers in cross-system reconciliation"
    algorithm: "HMAC-SHA256 with rotating salt"
    regulatory_benefit: "Tokenized data may not be considered personal data under NDPA"

Cross-Border Transfer Mechanism

"""
cross_border_transfer.py — NDPA-Compliant Cross-Border Data Transfer

Implements policy-as-code for cross-border data transfers,
enforcing NDPA Part VI requirements automatically.
"""

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class TransferDecision(Enum):
    ALLOWED = "allowed"
    ALLOWED_WITH_SAFEGUARDS = "allowed_with_safeguards"
    REQUIRES_CONSENT = "requires_consent"
    REQUIRES_DPIA = "requires_dpia"
    BLOCKED = "blocked"


class AdequacyStatus(Enum):
    ADEQUATE = "adequate"
    PARTIALLY_ADEQUATE = "partially_adequate"
    NOT_ADEQUATE = "not_adequate"
    NOT_ASSESSED = "not_assessed"


# Jurisdiction adequacy assessments (based on NDPC guidance)
JURISDICTION_ADEQUACY = {
    "NG": AdequacyStatus.ADEQUATE,           # Nigeria — domestic
    "GH": AdequacyStatus.ADEQUATE,           # Ghana (Data Protection Act 2012)
    "ZA": AdequacyStatus.ADEQUATE,           # South Africa (POPIA)
    "KE": AdequacyStatus.PARTIALLY_ADEQUATE, # Kenya (Data Protection Act 2019)
    "RW": AdequacyStatus.PARTIALLY_ADEQUATE, # Rwanda
    "EU": AdequacyStatus.ADEQUATE,           # EU (GDPR)
    "GB": AdequacyStatus.ADEQUATE,           # UK (UK GDPR)
    "CA": AdequacyStatus.ADEQUATE,           # Canada (PIPEDA)
    "US": AdequacyStatus.NOT_ADEQUATE,       # USA — no federal data protection law
    "CN": AdequacyStatus.PARTIALLY_ADEQUATE, # China (PIPL)
    "IN": AdequacyStatus.PARTIALLY_ADEQUATE, # India (DPDP Act 2023)
    "AE": AdequacyStatus.PARTIALLY_ADEQUATE, # UAE (PDPL)
}


@dataclass
class TransferRequest:
    """A request to transfer personal data across borders."""
    request_id: str
    data_category: str          # 'personal', 'sensitive', 'financial', 'health'
    source_jurisdiction: str    # ISO 3166-1 alpha-2
    dest_jurisdiction: str
    data_subject_count: int
    purpose: str
    recipient_name: str
    recipient_type: str         # 'processor', 'controller', 'joint_controller'
    has_explicit_consent: bool
    has_binding_corporate_rules: bool
    has_standard_contractual_clauses: bool
    has_dpia: bool
    requested_by: str
    requested_at: datetime


class CrossBorderTransferGate:
    """
    Evaluates and enforces cross-border data transfer compliance.

    Implements NDPA Part VI, Sections 47-49 requirements:
    - Checks destination jurisdiction adequacy
    - Evaluates available safeguards
    - Makes allow/block decisions with reasoning
    - Logs all decisions for audit
    """

    def __init__(self, db_connection):
        self.conn = db_connection

    def evaluate_transfer(self, request: TransferRequest) -> tuple[TransferDecision, str]:
        """
        Evaluate a cross-border transfer request and return a decision.

        Returns (decision, reasoning) tuple.
        """
        dest_adequacy = JURISDICTION_ADEQUACY.get(
            request.dest_jurisdiction, AdequacyStatus.NOT_ASSESSED
        )

        # Decision logic per NDPA Part VI
        if request.source_jurisdiction == request.dest_jurisdiction:
            decision = TransferDecision.ALLOWED
            reason = "Domestic transfer — no cross-border rules apply"

        elif dest_adequacy == AdequacyStatus.ADEQUATE:
            decision = TransferDecision.ALLOWED
            reason = (
                f"Destination {request.dest_jurisdiction} has adequate "
                f"data protection per NDPC assessment"
            )

        elif request.has_explicit_consent:
            decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
            reason = (
                "Transfer allowed based on explicit consent of data subjects "
                "(NDPA Sec 48(1)(a)). Consent records must be maintained."
            )

        elif request.has_binding_corporate_rules:
            decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
            reason = (
                "Transfer allowed under Binding Corporate Rules "
                "(NDPA Sec 48(1)(c)). BCR documentation must be on file."
            )

        elif request.has_standard_contractual_clauses:
            if request.has_dpia:
                decision = TransferDecision.ALLOWED_WITH_SAFEGUARDS
                reason = (
                    "Transfer allowed under Standard Contractual Clauses with DPIA "
                    "(NDPA Sec 48(1)(b)). SCC and DPIA must be on file."
                )
            else:
                decision = TransferDecision.REQUIRES_DPIA
                reason = (
                    "SCCs present but DPIA required for transfer to "
                    f"{request.dest_jurisdiction} (adequacy: {dest_adequacy.value})"
                )

        elif dest_adequacy == AdequacyStatus.PARTIALLY_ADEQUATE:
            decision = TransferDecision.REQUIRES_CONSENT
            reason = (
                f"Destination {request.dest_jurisdiction} has partial adequacy. "
                "Explicit consent or additional safeguards (BCR/SCC) required."
            )

        else:
            decision = TransferDecision.BLOCKED
            reason = (
                f"Transfer to {request.dest_jurisdiction} blocked. "
                f"Adequacy status: {dest_adequacy.value}. "
                "No qualifying safeguards (consent, BCR, or SCC) provided."
            )

        # Log the decision
        self._log_decision(request, decision, reason)

        logger.info(
            "Cross-border transfer decision: %s -> %s = %s (reason: %s)",
            request.source_jurisdiction, request.dest_jurisdiction,
            decision.value, reason[:100],
        )

        return decision, reason

    def _log_decision(self, request: TransferRequest, decision: TransferDecision, reason: str):
        """Log transfer decision to audit table."""
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO cross_border_transfer_audit
                (request_id, data_category, source_jurisdiction, dest_jurisdiction,
                 data_subject_count, purpose, recipient_name, decision, reason,
                 requested_by, decided_at)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW());
            """, (
                request.request_id, request.data_category,
                request.source_jurisdiction, request.dest_jurisdiction,
                request.data_subject_count, request.purpose,
                request.recipient_name, decision.value, reason,
                request.requested_by,
            ))
            self.conn.commit()

Comparison with Global Frameworks

Nigerian organisations increasingly operate across borders — fintech companies serving the diaspora, multinationals with Nigerian subsidiaries, and Nigerian firms expanding into other African markets. Understanding how Nigerian regulations compare with global frameworks is essential for building unified compliance architectures.

NDPR/NDPA vs GDPR vs CCPA

Dimension NDPR/NDPA (Nigeria) GDPR (EU) CCPA/CPRA (California)
Scope Any entity processing personal data of Nigerian residents Any entity processing personal data of EU residents Businesses meeting revenue/data volume thresholds in California
Lawful basis 6 bases (consent, contract, legal obligation, vital interest, public interest, legitimate interest) 6 bases (same as NDPA) No lawful basis concept; focuses on consumer rights
Consent standard Freely given, specific, informed, unambiguous Freely given, specific, informed, unambiguous Opt-out model (not opt-in) for sale/sharing
Data subject rights Access, rectification, erasure, portability, objection Access, rectification, erasure, portability, objection, restriction, automated decision Know, delete, opt-out of sale, non-discrimination, correct, limit
Breach notification 72 hours to NDPC 72 hours to supervisory authority "Without unreasonable delay" to consumers
DPO required Yes, above prescribed thresholds Yes, for certain processing types No (but requires privacy policy)
Cross-border transfers Adequacy decision, consent, BCR, SCC Adequacy decision, SCCs, BCR, derogations No specific cross-border transfer restrictions
Penalties (max) 2% annual gross revenue or NGN 10M (whichever is greater) 4% annual global turnover or EUR 20M (whichever is greater) $7,500 per intentional violation
Children's data Special protections required Consent required from holder of parental responsibility (under 16, member states may lower to 13) COPPA applies; CCPA additional protections under 16
Data residency Copy must be stored in Nigeria No explicit residency requirement (but Schrems II implications) No residency requirement
Enforcement body NDPC (Nigeria Data Protection Commission) National DPAs (e.g., CNIL, ICO) California AG, California Privacy Protection Agency

Building a Unified Compliance Architecture

For organisations operating across Nigeria, the EU, and the US, the strategy is to build to the highest common denominator and then layer jurisdiction-specific rules on top.

┌──────────────────────────────────────────────────────────────────────────┐
│           UNIFIED MULTI-JURISDICTION COMPLIANCE ARCHITECTURE             │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                    COMMON COMPLIANCE LAYER                       │   │
│  │  ──────────────────────────────────────                          │   │
│  │  - Consent management (GDPR-grade = NDPA-compliant)             │   │
│  │  - Data minimisation by default                                  │   │
│  │  - Purpose limitation enforcement                                │   │
│  │  - Encryption at rest and in transit                             │   │
│  │  - Audit trail for all data processing                          │   │
│  │  - Data lineage tracking                                        │   │
│  │  - Breach detection and notification pipeline                   │   │
│  │  - Right to erasure / deletion capability                       │   │
│  │  - Data Protection Impact Assessment workflow                   │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│  ┌────────────────┐  ┌────────────────┐  ┌────────────────────────┐    │
│  │ NDPA LAYER     │  │ GDPR LAYER     │  │ CCPA/CPRA LAYER       │    │
│  │ ──────────     │  │ ──────────     │  │ ─────────────         │    │
│  │ Nigeria-       │  │ EU-specific    │  │ California-specific   │    │
│  │ specific:      │  │ additions:     │  │ additions:            │    │
│  │                │  │                │  │                       │    │
│  │ - Data         │  │ - Art 30       │  │ - "Do Not Sell"       │    │
│  │   residency    │  │   processing   │  │   signal handling     │    │
│  │   (NG copy)    │  │   register     │  │ - Opt-out             │    │
│  │ - NITDA/NDPC   │  │ - DPA          │  │   preference          │    │
│  │   reporting    │  │   notification │  │   centre              │    │
│  │ - DPCO annual  │  │   (per member  │  │ - Financial           │    │
│  │   audit        │  │   state)       │  │   incentive           │    │
│  │ - CBN/NCC      │  │ - DPIA for     │  │   disclosures         │    │
│  │   sector reqs  │  │   high risk    │  │ - Shine the Light     │    │
│  │ - BVN/NIN      │  │ - Lead DPA     │  │   Act compliance      │    │
│  │   handling     │  │   designation  │  │ - Annual privacy      │    │
│  │                │  │ - Transfer     │  │   metric reporting    │    │
│  │                │  │   Impact       │  │                       │    │
│  │                │  │   Assessment   │  │                       │    │
│  └────────────────┘  └────────────────┘  └────────────────────────┘    │
│                                                                          │
│  POLICY ENGINE: Jurisdiction detected at ingestion time based on         │
│  data subject's location/nationality. Rules applied automatically.       │
└──────────────────────────────────────────────────────────────────────────┘

Policy-as-Code for Multi-Jurisdiction Compliance

"""
jurisdiction_policy.py — Policy-as-Code for Multi-Jurisdiction Compliance

Automatically applies the correct compliance rules based on the
jurisdiction of the data subject. Detected at ingestion time and
enforced throughout the pipeline.
"""

from dataclasses import dataclass, field
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class JurisdictionPolicy:
    """Compliance policy for a specific jurisdiction."""
    jurisdiction: str
    data_residency_required: bool
    residency_locations: list[str]
    retention_default_days: int
    breach_notification_hours: int
    dpo_required: bool
    dpia_required_for_high_risk: bool
    cross_border_restrictions: bool
    consent_model: str                  # 'opt_in', 'opt_out'
    right_to_erasure: bool
    right_to_portability: bool
    special_category_rules: bool
    max_penalty_description: str
    regulator: str
    additional_requirements: list[str] = field(default_factory=list)


# Define policies per jurisdiction
POLICIES = {
    "NG": JurisdictionPolicy(
        jurisdiction="NG",
        data_residency_required=True,
        residency_locations=["ng-lagos-1", "ng-lagos-2"],  # Nigerian data centres
        retention_default_days=365,
        breach_notification_hours=72,
        dpo_required=True,
        dpia_required_for_high_risk=True,
        cross_border_restrictions=True,
        consent_model="opt_in",
        right_to_erasure=True,
        right_to_portability=True,
        special_category_rules=True,
        max_penalty_description="2% annual gross revenue or NGN 10M",
        regulator="NDPC (Nigeria Data Protection Commission)",
        additional_requirements=[
            "Annual DPCO audit submission to NDPC",
            "Data protection officer appointment",
            "Copy of personal data must reside in Nigeria",
            "CBN sector-specific requirements for financial data",
            "NCC sector-specific requirements for telecom data",
        ],
    ),
    "EU": JurisdictionPolicy(
        jurisdiction="EU",
        data_residency_required=False,
        residency_locations=[],
        retention_default_days=365,
        breach_notification_hours=72,
        dpo_required=True,
        dpia_required_for_high_risk=True,
        cross_border_restrictions=True,
        consent_model="opt_in",
        right_to_erasure=True,
        right_to_portability=True,
        special_category_rules=True,
        max_penalty_description="4% annual global turnover or EUR 20M",
        regulator="National DPA (varies by member state)",
        additional_requirements=[
            "Article 30 Records of Processing Activities",
            "Transfer Impact Assessment for non-adequate countries",
            "Lead supervisory authority designation",
            "72-hour breach notification to DPA",
        ],
    ),
    "US-CA": JurisdictionPolicy(
        jurisdiction="US-CA",
        data_residency_required=False,
        residency_locations=[],
        retention_default_days=365,
        breach_notification_hours=0,  # "Without unreasonable delay"
        dpo_required=False,
        dpia_required_for_high_risk=False,
        cross_border_restrictions=False,
        consent_model="opt_out",
        right_to_erasure=True,
        right_to_portability=False,
        special_category_rules=True,   # Sensitive personal information under CPRA
        max_penalty_description="$7,500 per intentional violation",
        regulator="California Privacy Protection Agency (CPPA)",
        additional_requirements=[
            "Do Not Sell or Share signal support (GPC)",
            "Financial incentive disclosures",
            "Privacy policy with specific disclosures",
            "Annual privacy metrics reporting (for large processors)",
        ],
    ),
}


class JurisdictionDetector:
    """
    Determines the applicable jurisdiction for a data record.

    Uses multiple signals:
    1. Explicit jurisdiction tag (most reliable)
    2. Phone number country code
    3. IP geolocation
    4. Address country
    5. BVN/NIN presence (implies Nigerian)
    """

    def detect(self, record: dict) -> str:
        """Return the jurisdiction code for a data record."""
        # Priority 1: Explicit tag
        if "jurisdiction" in record:
            return record["jurisdiction"]

        # Priority 2: Nigerian identifiers
        if record.get("bvn") or record.get("nin"):
            return "NG"

        # Priority 3: Phone number prefix
        phone = record.get("phone_number", "")
        if phone.startswith("+234") or phone.startswith("234"):
            return "NG"

        # Priority 4: Address country
        country = record.get("country", "").upper()
        jurisdiction_map = {
            "NIGERIA": "NG", "NG": "NG",
            "GHANA": "GH", "GH": "GH",
            "SOUTH AFRICA": "ZA", "ZA": "ZA",
            "KENYA": "KE", "KE": "KE",
        }

        # EU member states
        eu_countries = {
            "DE", "FR", "IT", "ES", "NL", "BE", "AT", "SE", "PL",
            "DK", "FI", "IE", "PT", "CZ", "RO", "HU", "BG", "HR",
            "SK", "LT", "SI", "LV", "EE", "CY", "LU", "MT",
        }

        if country in jurisdiction_map:
            return jurisdiction_map[country]
        elif country in eu_countries:
            return "EU"
        elif country == "US":
            state = record.get("state", "").upper()
            if state in ("CA", "CALIFORNIA"):
                return "US-CA"
            return "US"

        # Default: treat as Nigerian if we cannot determine
        logger.warning("Could not determine jurisdiction for record, defaulting to NG")
        return "NG"


def apply_jurisdiction_policy(record: dict, detector: JurisdictionDetector) -> dict:
    """
    Annotate a data record with its applicable compliance policy.

    This annotation travels with the record through all pipeline stages,
    ensuring jurisdiction-specific rules are enforced at every step.
    """
    jurisdiction = detector.detect(record)
    policy = POLICIES.get(jurisdiction)

    if policy is None:
        # Unknown jurisdiction — apply most restrictive policy (NG + GDPR)
        logger.warning("No policy for jurisdiction %s, applying NG policy", jurisdiction)
        policy = POLICIES["NG"]

    record["_compliance"] = {
        "jurisdiction": jurisdiction,
        "data_residency_required": policy.data_residency_required,
        "residency_locations": policy.residency_locations,
        "retention_days": policy.retention_default_days,
        "consent_model": policy.consent_model,
        "right_to_erasure": policy.right_to_erasure,
        "cross_border_restricted": policy.cross_border_restrictions,
        "regulator": policy.regulator,
        "detected_at": "ingestion",
    }

    return record

Monitoring and Alerting for Compliance

Compliance is not a one-time build — it requires continuous monitoring. Implement dashboards and alerts that track compliance health in real time.

Key Compliance Metrics

-- Compliance health dashboard queries

-- 1. Consent coverage: What percentage of active users have valid consent?
SELECT
    consent_type,
    COUNT(*) FILTER (WHERE is_active = TRUE) AS consented,
    COUNT(*) AS total_subjects,
    ROUND(
        COUNT(*) FILTER (WHERE is_active = TRUE)::NUMERIC / COUNT(*) * 100, 2
    ) AS consent_coverage_pct
FROM current_consent_state
GROUP BY consent_type;

-- 2. Retention compliance: Are we holding data beyond policy limits?
SELECT
    policy_name,
    table_name,
    retention_days,
    last_execution_date,
    records_still_overdue,
    CASE
        WHEN records_still_overdue > 0 THEN 'NON_COMPLIANT'
        WHEN last_execution_date < CURRENT_DATE - INTERVAL '1 day' THEN 'STALE'
        ELSE 'COMPLIANT'
    END AS status
FROM retention_policy_status;

-- 3. Erasure SLA: Are we meeting the 30-day erasure deadline?
SELECT
    request_id,
    data_subject_id,
    requested_at,
    completed_at,
    status,
    EXTRACT(DAY FROM (COALESCE(completed_at, NOW()) - requested_at)) AS days_elapsed,
    CASE
        WHEN status = 'completed' AND (completed_at - requested_at) <= INTERVAL '30 days'
            THEN 'WITHIN_SLA'
        WHEN status != 'completed' AND (NOW() - requested_at) > INTERVAL '25 days'
            THEN 'AT_RISK'
        WHEN status != 'completed' AND (NOW() - requested_at) > INTERVAL '30 days'
            THEN 'SLA_BREACHED'
        ELSE 'ON_TRACK'
    END AS sla_status
FROM erasure_requests
WHERE requested_at >= CURRENT_DATE - INTERVAL '90 days'
ORDER BY requested_at DESC;

-- 4. Cross-border transfer decisions: Are we blocking non-compliant transfers?
SELECT
    dest_jurisdiction,
    decision,
    COUNT(*) AS transfer_count,
    SUM(data_subject_count) AS total_subjects
FROM cross_border_transfer_audit
WHERE decided_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY dest_jurisdiction, decision
ORDER BY dest_jurisdiction, decision;

-- 5. CBN returns submission status
SELECT
    return_type,
    reporting_period,
    generated_at,
    validation_passed,
    reconciliation_status,
    submitted_at,
    CASE
        WHEN submitted_at IS NOT NULL THEN 'SUBMITTED'
        WHEN validation_passed = FALSE THEN 'VALIDATION_FAILED'
        WHEN reconciliation_status = 'fail' THEN 'RECONCILIATION_FAILED'
        WHEN deadline < NOW() AND submitted_at IS NULL THEN 'OVERDUE'
        ELSE 'PENDING'
    END AS status
FROM regulatory_returns_tracker
WHERE reporting_period >= TO_CHAR(CURRENT_DATE - INTERVAL '6 months', 'YYYY-MM')
ORDER BY reporting_period DESC, return_type;

-- 6. Data lineage completeness: Can we trace every regulatory report to source?
SELECT
    destination_table,
    COUNT(*) AS total_pipeline_runs,
    COUNT(*) FILTER (WHERE source_checksum IS NOT NULL) AS with_source_checksum,
    COUNT(*) FILTER (WHERE destination_checksum IS NOT NULL) AS with_dest_checksum,
    COUNT(*) FILTER (WHERE git_commit_hash IS NOT NULL) AS with_code_version,
    ROUND(
        COUNT(*) FILTER (
            WHERE source_checksum IS NOT NULL
              AND destination_checksum IS NOT NULL
              AND git_commit_hash IS NOT NULL
        )::NUMERIC / NULLIF(COUNT(*), 0) * 100, 2
    ) AS full_lineage_pct
FROM data_lineage_events
WHERE destination_table LIKE 'regulatory_%'
  AND created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY destination_table;

Alerting Rules

# compliance_alerts.yaml — Compliance Monitoring Alert Configuration

alerts:
  - name: "consent_coverage_low"
    description: "Consent coverage has dropped below threshold"
    query: |
      SELECT consent_type, consent_coverage_pct
      FROM consent_coverage_view
      WHERE consent_coverage_pct < 80
    severity: "warning"
    threshold: 80
    channel: "slack:#compliance-alerts"
    escalation: "[email protected]"

  - name: "erasure_sla_at_risk"
    description: "Erasure request approaching 30-day NDPA deadline"
    query: |
      SELECT COUNT(*) FROM erasure_requests
      WHERE status != 'completed'
        AND (NOW() - requested_at) > INTERVAL '25 days'
    severity: "critical"
    threshold: 0
    channel: "slack:#compliance-alerts, pagerduty"
    escalation: "[email protected], [email protected]"

  - name: "cbn_return_overdue"
    description: "CBN regulatory return past submission deadline"
    query: |
      SELECT return_type, reporting_period
      FROM regulatory_returns_tracker
      WHERE deadline < NOW() AND submitted_at IS NULL
    severity: "critical"
    channel: "slack:#compliance-alerts, pagerduty, sms"
    escalation: "[email protected], [email protected]"

  - name: "reconciliation_failure"
    description: "Reconciliation check failed for regulatory report"
    query: |
      SELECT report_name, reporting_period
      FROM reconciliation_audit
      WHERE status = 'fail'
        AND generated_at >= CURRENT_DATE - INTERVAL '1 day'
    severity: "high"
    channel: "slack:#compliance-alerts"
    escalation: "[email protected]"

  - name: "cross_border_transfer_blocked"
    description: "Cross-border transfer was blocked by policy engine"
    query: |
      SELECT dest_jurisdiction, reason
      FROM cross_border_transfer_audit
      WHERE decision = 'blocked'
        AND decided_at >= CURRENT_DATE - INTERVAL '1 day'
    severity: "info"
    channel: "slack:#compliance-alerts"

  - name: "data_lineage_gap"
    description: "Regulatory pipeline run missing lineage metadata"
    query: |
      SELECT pipeline_name, pipeline_run_id
      FROM data_lineage_events
      WHERE destination_table LIKE 'regulatory_%'
        AND (source_checksum IS NULL OR git_commit_hash IS NULL)
        AND created_at >= CURRENT_DATE - INTERVAL '1 day'
    severity: "warning"
    channel: "slack:#data-engineering"

  - name: "retention_policy_stale"
    description: "Retention policy has not executed on schedule"
    query: |
      SELECT policy_name, last_execution_date
      FROM retention_policy_status
      WHERE last_execution_date < CURRENT_DATE - INTERVAL '2 days'
    severity: "warning"
    channel: "slack:#compliance-alerts"

Implementation Roadmap

Building compliant data infrastructure is not an overnight project. Here is a phased approach that delivers value incrementally while managing risk.

Phase 1: Foundation (Weeks 1-4)

Goal: Establish the compliance data platform and consent management.

  • Deploy compliance data warehouse (separate schema or dedicated cluster)
  • Implement consent event store and materialised views
  • Build pipeline consent gate (pre-ingestion check)
  • Set up data lineage tracking for all regulatory pipelines
  • Create retention policy definitions and automated enforcement
  • Establish encryption-at-rest and in-transit baselines

Phase 2: Regulatory Automation (Weeks 5-8)

Goal: Automate the most time-sensitive regulatory returns.

  • Build CBN monthly returns pipeline (Form M) with reconciliation
  • Implement AML/CFT transaction monitoring and STR/CTR generation
  • Build credit bureau reporting pipeline
  • Deploy automated reconciliation framework
  • Set up compliance monitoring dashboards
  • Implement alerting for SLA breaches and overdue submissions

Phase 3: Advanced Compliance (Weeks 9-12)

Goal: Complete the compliance capability with cross-border, erasure, and multi-jurisdiction support.

  • Implement right-to-erasure orchestration across all systems
  • Build cross-border transfer policy engine
  • Deploy DPIA automation workflow
  • Implement multi-jurisdiction policy-as-code
  • Build NCC QoS and subscriber reporting pipelines
  • Conduct end-to-end compliance audit simulation

Phase 4: Continuous Improvement (Ongoing)

Goal: Maintain compliance posture and adapt to regulatory changes.

  • Monthly compliance health reviews using dashboard metrics
  • Quarterly reconciliation accuracy audits
  • Annual DPCO audit preparation (automated report generation)
  • Regulatory change monitoring (NDPC, CBN, NCC gazettes)
  • Pipeline performance optimisation for growing data volumes
  • Staff training on compliance tooling and procedures

Common Pitfalls and How to Avoid Them

Based on our experience working with Nigerian organisations, here are the most common compliance engineering mistakes:

1. Treating Compliance as a One-Time Project

Compliance is a continuous process, not a project with an end date. Regulatory requirements evolve — the NDPA replaced the NDPR, CBN circulars are issued regularly, and enforcement patterns shift. Build your infrastructure to accommodate change.

Solution: Use configuration-driven compliance rules (YAML policies, not hard-coded logic). When a regulation changes, update the policy file, not the pipeline code.

2. Ignoring Data Lineage Until an Audit

When a regulator asks for the provenance of a number in your return, "we think it came from the core banking system" is not an acceptable answer. Data lineage must be captured from day one.

Solution: Instrument every pipeline step with lineage events. Make lineage a first-class citizen in your data platform, not an afterthought.

3. Manual Reconciliation

Manual reconciliation introduces human error, is not scalable, and does not leave an adequate audit trail. The first time your reconciliation reveals a material variance at 11 PM the night before a submission deadline, you will wish you had automated it.

Solution: Automated reconciliation with configurable tolerances, clear pass/fail criteria, and escalation workflows.

4. Confusing Encryption with Compliance

Encrypting your database does not make you NDPR-compliant. Compliance requires consent management, purpose limitation, retention policies, erasure capabilities, breach notification, and much more. Encryption is necessary but not sufficient.

Solution: Use the comprehensive compliance architecture described in this guide. Encryption is one layer of many.

5. Not Accounting for Data Residency

Many organisations unknowingly violate NDPR data residency requirements by using cloud services that store data outside Nigeria without maintaining a local copy. AWS's closest region is Cape Town, not Lagos.

Solution: Classify your data by tier (Nigeria-only, Africa-regional, Global) and enforce residency rules at the infrastructure level. Consider Nigerian data centre providers (Rack Centre, MainOne MDXi) for Tier 1 data.


Conclusion

Nigeria's regulatory landscape is complex but navigable. The key insight is that compliance is an engineering problem, not a legal problem. Lawyers define what you must do; engineers build the systems that do it reliably, at scale, and with evidence.

By engineering compliance into your data infrastructure — consent gates in every pipeline, automated retention enforcement, lineage tracking on every transformation, reconciliation checks before every submission — you transform regulatory requirements from a quarterly fire drill into an always-on, continuously verified capability.

The organisations that thrive in Nigeria's regulatory environment will not be those that hire the most compliance officers or produce the thickest policy documents. They will be the ones that build compliant systems — infrastructure that makes non-compliance technically difficult and compliance the default state.

At Gemut Analytics, we help Nigerian organisations build exactly this kind of infrastructure. Whether you are a bank preparing for the next CBN examination, a fintech navigating NDPR for the first time, or a telecom operator automating NCC returns, the principles in this guide apply. The specific implementation will vary by sector, scale, and existing infrastructure — but the architecture patterns are universal.

Start with consent management and data lineage. Those two capabilities alone will transform your compliance posture. Then layer on automated reporting, retention enforcement, and cross-border controls. Within a quarter, you will have infrastructure that makes your compliance team's job easier and your regulator's questions answerable in seconds instead of weeks.


This guide reflects the Nigerian regulatory landscape as of May 2024. Regulations evolve — particularly as the NDPC issues implementation guidance under the NDPA. Always consult with qualified legal counsel for definitive regulatory interpretation. The technical architectures described here are implementation patterns, not legal advice.

Need help building NDPR, CBN, or NCC-compliant data infrastructure? Contact Gemut Analytics for a compliance architecture assessment tailored to your organisation and sector.

Key Takeaways

  • Nigeria's regulatory data landscape spans NDPR/NDPA, CBN, NCC, FIRS, and SEC — each with distinct data handling, retention, and reporting requirements
  • NDPR-compliant data pipelines require consent management, purpose limitation enforcement, automated retention policies, and right-to-erasure capabilities baked into the architecture
  • CBN data requirements for financial institutions can be met through automated transaction reporting pipelines with built-in AML/KYC data validation
  • Data lineage and audit trails — implemented through metadata catalogs and pipeline instrumentation — provide the evidence trail regulators increasingly demand
  • Cross-border data transfer compliance (NDPR vs GDPR vs CCPA) can be managed through policy-as-code frameworks that enforce jurisdiction-specific rules automatically
Gemut Analytics Team
Gemut Analytics Team
Data Engineering Experts