"""Scenario 04 — Spoofed AIS Identity: data generator.

Composes shared generators from `generators/` to produce realtime, historical
and static data for the S4 demo.

Story (summary):
  F/V TÄHTI (MMSI 230199540, 12×4 m fishing) does her normal Hanko coastal loop
  on 2025-05-14 while an unrelated platform "X" begins broadcasting the same
  MMSI from central GoF (~78 NM east). RAD-PLN-01 holds the spoof hull at
  16-17 kn, length 28-34 m (track T-7741) — physically incompatible with the
  AIS-claimed 12 m / 6 kn fishing-vessel attributes. At MAC-HEL-PORT-04 the
  spoof's port footprint is 14 unknown MACs dominated by Huawei (00:E0:FC) and
  ZTE (34:DE:1A), zero overlap with TÄHTI's 11-visit Hanko baseline. F/V
  SILAKKA (MMSI 230888022) transits Porkkala -> Helsinki cleanly as a decoy
  and is correctly NOT alerted.

Two AIS streams broadcasting MMSI 230199540 simultaneously are produced as two
independent `AisTrack` instances with the same MMSI but completely different
waypoint sets (one slow loop near Hanko, one slow claimed track in the central
GoF). The "AIS-claimed vs radar-truth" divergence is implemented as a
separate fast `RadarTrack` whose waypoints share the spoof origin but cover a
much greater distance over the same time interval. We do NOT use
`AisTrack.spoof_position_offset`.
"""
from __future__ import annotations

import json
import math
import random
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

REPO_ROOT = Path(__file__).resolve().parents[2]
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from generators.common import (  # noqa: E402
    ambient_mmsi,
    haversine_m,
    load_infrastructure,
    load_personas,
    load_sensors,
    maybe_decimate_mac_ndjson,
    maybe_decimate_ndjson,
    sensor_lookup,
    write_csv,
    write_geojson,
    write_ndjson,
)
from generators.ais_generator import AisTrack, ais_snapshot_geojson, emit_ais  # noqa: E402
from generators.mac_generator import (  # noqa: E402
    MAC_CSV_HEADER,
    MacObservation,
    MovingMacEmitter,
    simulate_moving_mac,
)
from generators.radar_generator import RadarTrack, emit_radar  # noqa: E402

UTC = timezone.utc
SCENARIO_DIR = Path(__file__).resolve().parent
OUT_REALTIME = SCENARIO_DIR / "data" / "realtime"
OUT_STATIC = SCENARIO_DIR / "data" / "static"
OUT_HISTORICAL = SCENARIO_DIR / "data" / "historical"

# ---------------------------------------------------------------------------
# Anchor times
# ---------------------------------------------------------------------------
WINDOW_OPEN = datetime(2025, 5, 14, 3, 0, 0, tzinfo=UTC)
WINDOW_CLOSE = datetime(2025, 5, 14, 9, 0, 0, tzinfo=UTC)

TAHTI_DEPART = datetime(2025, 5, 14, 3, 40, 0, tzinfo=UTC)
TAHTI_RETURN = datetime(2025, 5, 14, 8, 30, 0, tzinfo=UTC)

SPOOF_AIS_T0 = datetime(2025, 5, 14, 5, 55, 0, tzinfo=UTC)
SPOOF_AIS_T1 = datetime(2025, 5, 14, 8, 5, 0, tzinfo=UTC)

SPOOF_RADAR_PLN_T0 = datetime(2025, 5, 14, 6, 10, 0, tzinfo=UTC)
SPOOF_RADAR_PLN_T1 = datetime(2025, 5, 14, 7, 45, 0, tzinfo=UTC)

SPOOF_RADAR_COAST_T0 = datetime(2025, 5, 14, 7, 50, 0, tzinfo=UTC)
SPOOF_RADAR_COAST_T1 = datetime(2025, 5, 14, 8, 30, 0, tzinfo=UTC)

SILAKKA_DEPART = datetime(2025, 5, 14, 5, 30, 0, tzinfo=UTC)
SILAKKA_ARRIVE = datetime(2025, 5, 14, 7, 40, 0, tzinfo=UTC)

# Spoof's MAC fingerprint dwell at Katajanokka
SPOOF_MAC_T0 = datetime(2025, 5, 14, 7, 18, 0, tzinfo=UTC)
SPOOF_MAC_T1 = datetime(2025, 5, 14, 8, 0, 0, tzinfo=UTC)

# ---------------------------------------------------------------------------
# Tracks
# ---------------------------------------------------------------------------
# Real TÄHTI: slow ~5-7 kn coastal loop around Hanko (~22.97 E, 59.82 N)
TAHTI_WAYPOINTS = [
    (TAHTI_DEPART,                                   59.8230, 22.9720),   # Hanko fishing harbour quay
    (TAHTI_DEPART + timedelta(minutes=25),           59.8160, 22.9740),   # Outer mole
    (TAHTI_DEPART + timedelta(minutes=50),           59.7900, 22.9900),   # Fishing ground 1
    (TAHTI_DEPART + timedelta(minutes=110),          59.7700, 23.0200),   # Loop far point
    (TAHTI_DEPART + timedelta(minutes=170),          59.7800, 22.9800),   # Loop returning
    (TAHTI_DEPART + timedelta(minutes=230),          59.7950, 22.9650),   # Fishing ground 2
    (TAHTI_DEPART + timedelta(minutes=280),          59.8180, 22.9700),   # Approach harbour
    (TAHTI_RETURN,                                   59.8230, 22.9720),   # Back to quay
]

# Spoof X AIS-claimed track: slow ~6 kn from central GoF toward Helsinki approach
SPOOF_AIS_WAYPOINTS = [
    (SPOOF_AIS_T0,                                   59.7800, 24.1000),
    (datetime(2025, 5, 14, 6, 20, 0, tzinfo=UTC),    59.7900, 24.1800),
    (datetime(2025, 5, 14, 6, 50, 0, tzinfo=UTC),    59.8150, 24.3300),
    (datetime(2025, 5, 14, 7, 18, 0, tzinfo=UTC),    59.8700, 24.6200),
    (datetime(2025, 5, 14, 7, 45, 0, tzinfo=UTC),    59.9000, 24.7600),
    (SPOOF_AIS_T1,                                   59.9300, 24.8800),
]

# Spoof X true-hull (radar) track on RAD-PLN-01: ~17 kn from same origin, longer reach
SPOOF_RADAR_PLN_WAYPOINTS = [
    (SPOOF_RADAR_PLN_T0,                             59.7820, 24.0850),   # Same origin region, 15 min after AIS onset
    (datetime(2025, 5, 14, 6, 20, 0, tzinfo=UTC),    59.7880, 24.1150),
    (datetime(2025, 5, 14, 6, 35, 0, tzinfo=UTC),    59.8200, 24.2700),
    (datetime(2025, 5, 14, 6, 50, 0, tzinfo=UTC),    59.8530, 24.4300),
    (datetime(2025, 5, 14, 7, 5,  0, tzinfo=UTC),    59.8860, 24.5950),
    (datetime(2025, 5, 14, 7, 25, 0, tzinfo=UTC),    59.9280, 24.8150),
    (SPOOF_RADAR_PLN_T1,                             59.9650, 25.0200),
]

# Coastal radar hand-off (RAD-COAST-HEL-01) — picks up spoof hull moving into Helsinki approach
SPOOF_RADAR_COAST_WAYPOINTS = [
    (SPOOF_RADAR_COAST_T0,                           60.0500, 25.0500),
    (datetime(2025, 5, 14, 8, 5,  0, tzinfo=UTC),    60.0900, 25.0300),
    (datetime(2025, 5, 14, 8, 20, 0, tzinfo=UTC),    60.1200, 25.0100),
    (SPOOF_RADAR_COAST_T1,                           60.1400, 24.9900),
]

# SILAKKA decoy: Porkkala -> Helsinki South Harbour at ~8 kn -> 6 kn arrival
SILAKKA_WAYPOINTS = [
    (SILAKKA_DEPART,                                  59.9850, 24.4200),
    (datetime(2025, 5, 14, 6, 30, 0, tzinfo=UTC),     60.0300, 24.6200),
    (datetime(2025, 5, 14, 7, 12, 0, tzinfo=UTC),     60.1500, 24.9400),
    (SILAKKA_ARRIVE,                                  60.1660, 24.9520),
]


# ---------------------------------------------------------------------------
# Catalog subsets used in this scenario
# ---------------------------------------------------------------------------
SENSORS_USED_IDS = {
    # MAC
    "MAC-HKO-COAST-01",
    "MAC-HKO-PORT-01", "MAC-HKO-PORT-02", "MAC-HKO-PORT-03",
    "MAC-HEL-PORT-04", "MAC-HEL-PORT-05",
    "MAC-PRK-COAST-01",
    # Radar
    "RAD-PLN-01", "RAD-COAST-HEL-01",
}

INFRA_USED_IDS = {
    "port-hanko", "port-helsinki",
    "shipping-lane-eb", "shipping-lane-wb",
    "finnish-eez-gof",
}

INFRA_FC = load_infrastructure()

# ---------------------------------------------------------------------------
# MAC fingerprints
# ---------------------------------------------------------------------------
# TÄHTI persistent crew/vessel MACs (catalog + invented per spec).
# `P-TAH-OWNER` iPhone comes from personas.json; the other 3 are invented.
TAHTI_PERSISTENT_MACS = [
    {"role": "owner_iphone",    "mac": "A4:83:E7:5C:9B:51", "vendor": "Apple",   "persistence": 1.00},  # P-TAH-OWNER (catalog)
    {"role": "guest_iphone",    "mac": "A4:83:E7:5C:9B:52", "vendor": "Apple",   "persistence": 0.90},
    {"role": "guest_samsung",   "mac": "38:F9:D3:11:22:53", "vendor": "Samsung", "persistence": 0.82},
    {"role": "onboard_router",  "mac": "A4:3C:5A:7A:00:01", "vendor": "u-blox",  "persistence": 1.00},
]

# Spoof window fingerprint at MAC-HEL-PORT-04 (14 unknown MACs, zero overlap with TÄHTI baseline)
SPOOF_MAC_FINGERPRINT: list[dict[str, Any]] = [
    {"mac": "00:E0:FC:A1:B2:C3", "vendor": "Huawei"},
    {"mac": "00:E0:FC:D4:E5:F6", "vendor": "Huawei"},
    {"mac": "00:E0:FC:11:22:33", "vendor": "Huawei"},
    {"mac": "00:E0:FC:44:55:66", "vendor": "Huawei"},
    {"mac": "00:E0:FC:77:88:99", "vendor": "Huawei"},
    {"mac": "34:DE:1A:01:02:03", "vendor": "ZTE"},
    {"mac": "34:DE:1A:AA:BB:CC", "vendor": "ZTE"},
    {"mac": "34:DE:1A:DD:EE:F0", "vendor": "ZTE"},
    {"mac": "34:DE:1A:55:66:77", "vendor": "ZTE"},
    {"mac": "5E:91:22:88:77:66", "vendor": None},
    {"mac": "7A:0B:1C:2D:3E:4F", "vendor": None},
    {"mac": "A2:33:44:55:66:77", "vendor": None},
    {"mac": "B6:88:99:AA:BB:CC", "vendor": None},
    {"mac": "02:42:9F:00:11:22", "vendor": None},
]

# SILAKKA fingerprint at MAC-HEL-PORT-05 (3 persistent MACs, distinct from TÄHTI and from spoof)
SILAKKA_PERSISTENT_MACS = [
    {"role": "skipper_iphone", "mac": "A4:83:E7:6B:11:22", "vendor": "Apple"},
    {"role": "crew_ble",       "mac": "B0:7D:64:6B:33:44", "vendor": "Apple-BLE"},
    {"role": "onboard_router", "mac": "A4:3C:5A:6B:55:66", "vendor": "u-blox"},
]


# ---------------------------------------------------------------------------
# Helper: emit static MAC observations at a single port sensor for a window
# (used to inject the spoof's port fingerprint at MAC-HEL-PORT-04 without
# routing through `simulate_moving_mac` — we want them to appear ONLY at the
# one sensor, with port-typical RSSIs).
# ---------------------------------------------------------------------------
def _emit_port_dwell_mac(
    *,
    sensor_id: str,
    mac: str,
    vendor: str | None,
    t0: datetime,
    t1: datetime,
    session_window_s: float = 120.0,
    rssi_band: tuple[float, float] = (-86.0, -68.0),
    seed: int = 0,
) -> list[MacObservation]:
    rng = random.Random(seed)
    out: list[MacObservation] = []
    t = t0
    while t < t1:
        win_end = min(t1, t + timedelta(seconds=session_window_s))
        rssi = rng.uniform(*rssi_band)
        # Occasionally drop a window to mimic propagation gaps
        if rng.random() < 0.18:
            t = win_end
            continue
        out.append(MacObservation(
            sensor_id=sensor_id,
            mac=mac,
            session_start=t,
            session_end=win_end,
            message_count=max(1, int(rng.gauss(35, 9))),
            avg_rssi=rssi,
            manufacturer=vendor,
        ))
        t = win_end
    return out


# ---------------------------------------------------------------------------
# Consumer-only background MAC noise (excluding the spoof / TÄHTI / SILAKKA
# fingerprint MACs so the scenario stays clean).
# ---------------------------------------------------------------------------
_FINGERPRINT_MACS = (
    {m["mac"].upper() for m in TAHTI_PERSISTENT_MACS}
    | {m["mac"].upper() for m in SPOOF_MAC_FINGERPRINT}
    | {m["mac"].upper() for m in SILAKKA_PERSISTENT_MACS}
)


def consumer_background_macs(
    sensors: dict[str, dict[str, Any]],
    start: datetime,
    end: datetime,
    *,
    mac_count: int = 35,
    cadence_s: float = 420.0,
    seed: int = 42,
) -> list[MacObservation]:
    personas = load_personas()
    rng = random.Random(seed)
    oui_choices: list[tuple[str, str]] = []
    for vendor, prefixes in personas["oui_vendors_real"].items():
        for p in prefixes:
            oui_choices.append((vendor, p))

    # Pre-generate background MAC pool
    macs: list[tuple[str, str]] = []
    while len(macs) < mac_count:
        vendor, prefix = rng.choice(oui_choices)
        suffix = ":".join(f"{rng.randint(0, 255):02X}" for _ in range(3))
        mac = f"{prefix}:{suffix}"
        if mac.upper() in _FINGERPRINT_MACS:
            continue
        macs.append((mac, vendor))

    out: list[MacObservation] = []
    t = start
    while t < end:
        for sid, sensor in sensors.items():
            if sid not in SENSORS_USED_IDS:
                continue
            if sensor.get("kind") != "mac":
                continue
            n = rng.randint(0, 3)
            chosen = rng.sample(macs, k=min(n, len(macs)))
            for mac, vendor in chosen:
                rssi = rng.uniform(-105, -62)
                out.append(MacObservation(
                    sensor_id=sid,
                    mac=mac,
                    session_start=t,
                    session_end=t + timedelta(seconds=cadence_s),
                    message_count=rng.randint(1, 50),
                    avg_rssi=rssi,
                    manufacturer=vendor,
                ))
        t = t + timedelta(seconds=cadence_s)
    return out


# ---------------------------------------------------------------------------
# Ambient AIS background — Gulf of Finland transiting fleet for plausibility
# ---------------------------------------------------------------------------
def build_ambient_ais(n_ships: int, start: datetime, end: datetime,
                      seed: int) -> list[dict[str, Any]]:
    rng = random.Random(seed)
    out: list[dict[str, Any]] = []
    for i in range(n_ships):
        eastbound = rng.random() < 0.5
        lat0 = rng.uniform(59.55, 60.30)
        lat1 = lat0 + rng.uniform(-0.10, 0.10)
        if eastbound:
            lon0, lon1 = 22.5, 27.0
        else:
            lon0, lon1 = 27.0, 22.5
        day_seconds = max(1.0, (end - start).total_seconds() - 3 * 3600)
        t_start = start + timedelta(seconds=rng.uniform(0, day_seconds))
        t_end = t_start + timedelta(minutes=rng.uniform(45, 140))
        if t_end > end:
            continue
        flag_roll = rng.random()
        flag = "FI" if flag_roll < 0.7 else ("EE" if flag_roll < 0.9 else "OTHER")
        mmsi = ambient_mmsi(rng, flag)
        if mmsi in (230199540, 230888022):
            continue
        track = AisTrack(
            mmsi=mmsi,
            waypoints=[(t_start, lat0, lon0), (t_end, lat1, lon1)],
            cadence_s=15.0,
            destination="FIHEL" if eastbound else "EETLL",
            seed=seed + i,
        )
        out.extend(emit_ais(track))
    return out


# ---------------------------------------------------------------------------
# REALTIME
# ---------------------------------------------------------------------------
def generate_realtime() -> dict[str, int]:
    sensors = sensor_lookup()
    counts: dict[str, int] = {}

    # ---------- AIS — Real F/V TÄHTI ----------
    tahti_track = AisTrack(
        mmsi=230199540,
        waypoints=TAHTI_WAYPOINTS,
        cadence_s=10.0,
        destination="FIHKO",
        nav_status=7,  # engaged in fishing
        speed_jitter_kn=0.5,
        course_jitter_deg=4.0,
        seed=4001,
    )
    tahti_msgs = emit_ais(tahti_track)
    for r in tahti_msgs:
        r["msg_class"] = "B"
        r["source_receiver"] = "AIS-COAST-HKO"
        r["_origin_tag"] = "tahti_real"

    # ---------- AIS — Spoof platform X (broadcasting MMSI 230199540) ----------
    spoof_ais_track = AisTrack(
        mmsi=230199540,
        waypoints=SPOOF_AIS_WAYPOINTS,
        cadence_s=10.0,
        destination="FIHEL",
        nav_status=0,
        speed_jitter_kn=0.4,
        course_jitter_deg=2.5,
        seed=4002,
    )
    spoof_ais_msgs = emit_ais(spoof_ais_track)
    for r in spoof_ais_msgs:
        r["msg_class"] = "B"
        r["source_receiver"] = "AIS-COAST-HEL-1"
        r["_origin_tag"] = "spoof_x_claimed"

    # ---------- AIS — F/V SILAKKA decoy ----------
    silakka_track = AisTrack(
        mmsi=230888022,
        waypoints=SILAKKA_WAYPOINTS,
        cadence_s=10.0,
        destination="FIHEL",
        nav_status=0,
        speed_jitter_kn=0.3,
        course_jitter_deg=2.0,
        seed=4003,
    )
    silakka_msgs = emit_ais(silakka_track)
    for r in silakka_msgs:
        r["msg_class"] = "B"
        r["source_receiver"] = "AIS-COAST-PRK"
        r["_origin_tag"] = "silakka_decoy"

    # ---------- AIS — ambient background fleet ----------
    ambient_msgs = build_ambient_ais(n_ships=1200, start=WINDOW_OPEN,
                                     end=WINDOW_CLOSE, seed=4100)

    # Interleave by timestamp
    ais_all = tahti_msgs + spoof_ais_msgs + silakka_msgs + ambient_msgs
    ais_all.sort(key=lambda r: r["ts_epoch_ms"])
    counts["ais.ndjson"] = write_ndjson(
        OUT_REALTIME / "ais.ndjson", ais_all,
        "s4-spoofed-ais-identity/ais")

    snapshot_features = ais_snapshot_geojson(ais_all)
    counts["ais_snapshot.geojson"] = write_geojson(
        OUT_REALTIME / "ais_snapshot.geojson", snapshot_features,
        "s4-spoofed-ais-identity/ais_snapshot")

    # ---------- Plane radar (RAD-PLN-01) — spoof hull truth, track T-7741 ----------
    plane_radar_track = RadarTrack(
        track_id="T-7741",
        sensor_id="RAD-PLN-01",
        waypoints=SPOOF_RADAR_PLN_WAYPOINTS,
        cadence_s=4.0,
        classification="surface_medium",
        rcs_m2=350.0,
        confidence=0.88,
        seed=4201,
    )
    plane_radar_msgs = emit_radar(plane_radar_track)
    for r in plane_radar_msgs:
        # Length-from-track-spread estimate, jittered around 31 m within 28-34 m band
        rng = random.Random(int(r["ts_epoch_ms"]) ^ 0xA17)
        r["length_estimate_m"] = round(31.0 + rng.gauss(0.0, 1.4), 1)
        r["heading_deg"] = round(r["cog_deg"], 1)
        r["speed_kn"] = r["sog_kn"]
        r["source_sensorId"] = "RAD-PLN-01"
        r["source_type"] = "airborne_radar"
        r["platform_altitude_m"] = 3000
        r["altitude_m"] = 0
        r["mmsi_hint"] = None  # spoof has no honest AIS link
    counts["plane_radar.ndjson"] = write_ndjson(
        OUT_REALTIME / "plane_radar.ndjson", plane_radar_msgs,
        "s4-spoofed-ais-identity/plane_radar")

    # ---------- Coastal radar (RAD-COAST-HEL-01) — picks up spoof on approach ----------
    coastal_radar_track = RadarTrack(
        track_id="T-7741",
        sensor_id="RAD-COAST-HEL-01",
        waypoints=SPOOF_RADAR_COAST_WAYPOINTS,
        cadence_s=2.0,
        classification="surface_medium",
        rcs_m2=380.0,
        confidence=0.84,
        seed=4202,
    )
    coastal_radar_msgs = emit_radar(coastal_radar_track)
    for r in coastal_radar_msgs:
        rng = random.Random(int(r["ts_epoch_ms"]) ^ 0xB17)
        r["length_estimate_m"] = round(30.0 + rng.gauss(0.0, 1.6), 1)
        r["heading_deg"] = round(r["cog_deg"], 1)
        r["speed_kn"] = r["sog_kn"]
        r["source_sensorId"] = "RAD-COAST-HEL-01"
        r["source_type"] = "coastal_radar"
        r["platform_altitude_m"] = 0
        r["altitude_m"] = 0
        r["mmsi_hint"] = None  # spoof AIS has already gone silent
        s = sensors["RAD-COAST-HEL-01"]
        d_m = haversine_m(s["lat"], s["lon"], r["lat"], r["lon"])
        r["range_nm"] = round(d_m / 1852.0, 2)
    counts["coastal_radar.ndjson"] = write_ndjson(
        OUT_REALTIME / "coastal_radar.ndjson", coastal_radar_msgs,
        "s4-spoofed-ais-identity/coastal_radar")

    # ---------- MAC sensor stream ----------
    mac_obs: list[MacObservation] = []
    rng = random.Random(4300)

    # ----- Real TÄHTI persistent crew MACs riding the coastal loop -----
    tahti_active = [(TAHTI_DEPART - timedelta(minutes=15),
                     TAHTI_RETURN + timedelta(minutes=10))]
    for i, p in enumerate(TAHTI_PERSISTENT_MACS):
        # Owner phone and onboard router every visit; guests with their persistence
        if rng.random() > p["persistence"]:
            continue  # this guest is "off" today (rare)
        em = MovingMacEmitter(
            mac=p["mac"],
            manufacturer=p["vendor"],
            waypoints=TAHTI_WAYPOINTS,
            active_windows=tahti_active,
            seed=4310 + i,
        )
        # Restrict to Hanko cluster only (other sensors are too far anyway —
        # but be explicit so the long-range path-loss never accidentally
        # makes a TÄHTI MAC appear at Helsinki).
        hanko_sensors = {sid: s for sid, s in sensors.items()
                         if sid in {"MAC-HKO-COAST-01", "MAC-HKO-PORT-01",
                                    "MAC-HKO-PORT-02", "MAC-HKO-PORT-03"}}
        mac_obs.extend(simulate_moving_mac(em, hanko_sensors,
                                           session_window_s=180.0))

    # ----- Spoof X port fingerprint at MAC-HEL-PORT-04 ONLY -----
    # 14 unknown MACs during the 07:18..08:00 Z dwell. Port-typical RSSI band.
    for i, m in enumerate(SPOOF_MAC_FINGERPRINT):
        mac_obs.extend(_emit_port_dwell_mac(
            sensor_id="MAC-HEL-PORT-04",
            mac=m["mac"],
            vendor=m["vendor"],
            t0=SPOOF_MAC_T0,
            t1=SPOOF_MAC_T1,
            session_window_s=120.0,
            rssi_band=(-84.0, -68.0),
            seed=4400 + i,
        ))

    # ----- SILAKKA persistent crew MACs at MAC-HEL-PORT-05 -----
    silakka_dwell_t0 = datetime(2025, 5, 14, 7, 10, 0, tzinfo=UTC)
    silakka_dwell_t1 = datetime(2025, 5, 14, 7, 45, 0, tzinfo=UTC)
    for i, m in enumerate(SILAKKA_PERSISTENT_MACS):
        mac_obs.extend(_emit_port_dwell_mac(
            sensor_id="MAC-HEL-PORT-05",
            mac=m["mac"],
            vendor=m["vendor"],
            t0=silakka_dwell_t0,
            t1=silakka_dwell_t1,
            session_window_s=180.0,
            rssi_band=(-80.0, -64.0),
            seed=4500 + i,
        ))
    # SILAKKA crew also briefly seen at MAC-PRK-COAST-01 on departure
    for i, m in enumerate(SILAKKA_PERSISTENT_MACS):
        mac_obs.extend(_emit_port_dwell_mac(
            sensor_id="MAC-PRK-COAST-01",
            mac=m["mac"],
            vendor=m["vendor"],
            t0=SILAKKA_DEPART - timedelta(minutes=5),
            t1=SILAKKA_DEPART + timedelta(minutes=20),
            session_window_s=180.0,
            rssi_band=(-92.0, -76.0),
            seed=4600 + i,
        ))

    # ----- Background MAC noise across the 6 h window at our 7 catalog sensors -----
    bg = consumer_background_macs(
        sensors, WINDOW_OPEN, WINDOW_CLOSE,
        mac_count=40, cadence_s=420.0, seed=4700,
    )
    mac_obs.extend(bg)

    # ----- Safety: no spoof MACs leaked into the TÄHTI baseline path -----
    spoof_mac_set = {m["mac"].upper() for m in SPOOF_MAC_FINGERPRINT}
    tahti_mac_set = {m["mac"].upper() for m in TAHTI_PERSISTENT_MACS}
    overlap = spoof_mac_set & tahti_mac_set
    if overlap:
        raise AssertionError(
            f"Spoof fingerprint overlaps TÄHTI baseline (forbidden): {overlap}")

    mac_nd = [m.to_ndjson() for m in mac_obs]
    counts["mac.ndjson"] = write_ndjson(
        OUT_REALTIME / "mac.ndjson", mac_nd,
        "s4-spoofed-ais-identity/mac")
    mac_rows = [m.to_csv_row() for m in mac_obs]
    counts["mac.csv"] = write_csv(
        OUT_REALTIME / "mac.csv", MAC_CSV_HEADER, mac_rows,
        "s4-spoofed-ais-identity/mac_sessions")

    decim_reports = []
    AIS_DECIM_FIELDS = ["timestamp", "lat", "lon", "sog_kn", "cog_deg", "nav_status"]
    RADAR_DECIM_FIELDS = ["timestamp", "lat", "lon", "sog_kn", "cog_deg", "alt_m",
                          "speed_mps", "heading_deg", "rcs_m2", "classification",
                          "mmsi_hint", "kind"]
    for path, kw in [
        (OUT_REALTIME / "ais.ndjson", {"key_field": "mmsi", "ts_field": "ts_epoch_ms",
                                       "project_fields": AIS_DECIM_FIELDS}),
        (OUT_REALTIME / "plane_radar.ndjson", {"key_field": "track_id", "ts_field": "ts_epoch_ms",
                                               "project_fields": RADAR_DECIM_FIELDS}),
        (OUT_REALTIME / "coastal_radar.ndjson", {"key_field": "track_id", "ts_field": "ts_epoch_ms",
                                                 "project_fields": RADAR_DECIM_FIELDS}),
    ]:
        rep = maybe_decimate_ndjson(path, **kw)
        if rep:
            decim_reports.append(rep)
            counts[Path(rep["decimated"]).name] = rep["rows"] + 1
    mac_rep = maybe_decimate_mac_ndjson(OUT_REALTIME / "mac.ndjson")
    if mac_rep:
        decim_reports.append(mac_rep)
        counts[Path(mac_rep["decimated"]).name] = mac_rep["rows"] + 1
    if decim_reports:
        print("[S4] decimated companion files:")
        for r in decim_reports:
            print(f"  {Path(r['decimated']).name}  "
                  f"{r['source_bytes']/1024/1024:.1f}MB -> {r['decimated_bytes']/1024/1024:.1f}MB"
                  f"  ({r['rows']} rows)")

    return counts


# ---------------------------------------------------------------------------
# STATIC GeoJSON
# ---------------------------------------------------------------------------
def generate_static() -> dict[str, int]:
    counts: dict[str, int] = {}

    aoi = {
        "type": "Feature",
        "properties": {
            "featureId": "s4-aoi",
            "name": "S4 Area of Interest",
            "note": "Hanko -> central GoF -> Helsinki, covering TÄHTI loop, spoof tracks and SILAKKA decoy.",
        },
        "geometry": {
            "type": "Polygon",
            "coordinates": [[
                [22.50, 59.55],
                [25.50, 59.55],
                [25.50, 60.30],
                [22.50, 60.30],
                [22.50, 59.55],
            ]],
        },
    }
    counts["area_of_interest.geojson"] = write_geojson(
        OUT_STATIC / "area_of_interest.geojson", [aoi],
        "s4-spoofed-ais-identity/area_of_interest")

    sensors_fc = load_sensors()
    sensor_feats = [f for f in sensors_fc["features"]
                    if f["properties"]["sensorId"] in SENSORS_USED_IDS]
    counts["sensors_used.geojson"] = write_geojson(
        OUT_STATIC / "sensors_used.geojson", sensor_feats,
        "s4-spoofed-ais-identity/sensors_used")

    infra_feats = [f for f in INFRA_FC["features"]
                   if f["properties"]["featureId"] in INFRA_USED_IDS]
    counts["infrastructure_used.geojson"] = write_geojson(
        OUT_STATIC / "infrastructure_used.geojson", infra_feats,
        "s4-spoofed-ais-identity/infrastructure_used")

    def _wp_to_linestring(wps, featureId, name, extra=None):
        feat = {
            "type": "Feature",
            "properties": {"featureId": featureId, "name": name},
            "geometry": {
                "type": "LineString",
                "coordinates": [[lon, lat] for (_, lat, lon) in wps],
            },
        }
        if extra:
            feat["properties"].update(extra)
        return feat

    counts["tahti_baseline_route.geojson"] = write_geojson(
        OUT_STATIC / "tahti_baseline_route.geojson",
        [_wp_to_linestring(TAHTI_WAYPOINTS, "tahti-loop",
                           "F/V TÄHTI Hanko coastal loop (legitimate)",
                           {"mmsi": 230199540})],
        "s4-spoofed-ais-identity/tahti_baseline_route")

    counts["silakka_decoy_route.geojson"] = write_geojson(
        OUT_STATIC / "silakka_decoy_route.geojson",
        [_wp_to_linestring(SILAKKA_WAYPOINTS, "silakka-transit",
                           "F/V SILAKKA Porkkala -> Helsinki transit (decoy, clean)",
                           {"mmsi": 230888022})],
        "s4-spoofed-ais-identity/silakka_decoy_route")

    counts["spoof_claimed_track.geojson"] = write_geojson(
        OUT_STATIC / "spoof_claimed_track.geojson",
        [_wp_to_linestring(SPOOF_AIS_WAYPOINTS, "spoof-x-ais-claimed",
                           "Spoof platform X — AIS-claimed track (slow ~6 kn, claims MMSI 230199540)",
                           {"claimed_mmsi": 230199540, "claimed_type": 30})],
        "s4-spoofed-ais-identity/spoof_claimed_track")

    counts["spoof_observed_track.geojson"] = write_geojson(
        OUT_STATIC / "spoof_observed_track.geojson",
        [_wp_to_linestring(SPOOF_RADAR_PLN_WAYPOINTS, "spoof-x-radar-truth",
                           "Spoof platform X — radar-observed truth (~17 kn, length 28-34 m, RAD-PLN-01 T-7741)",
                           {"radar_track_id": "T-7741", "sensor_id": "RAD-PLN-01"})],
        "s4-spoofed-ais-identity/spoof_observed_track")

    return counts


# ---------------------------------------------------------------------------
# HISTORICAL — 11 prior TÄHTI Hanko port visits over 6 weeks showing the
# stable baseline MAC fingerprint. Simple per-visit observation records at
# MAC-HKO-PORT-01..03 / MAC-HKO-COAST-01 spaced over ~6 weeks.
#
# Visits are scheduled approximately every ~4 days from 6 weeks before the
# scenario anchor (2025-05-14). Each visit:
#   - One short AIS Class B "arrival" emission to ais_baseline.ndjson
#   - Per-port-sensor MAC sessions for the persistent crew/vessel MACs
#     (each with its declared per-visit persistence probability so guest
#     MACs are sometimes absent, matching the spec's ≥ ⌈N/2⌉ persistence).
# ---------------------------------------------------------------------------
HISTORICAL_VISIT_OFFSETS_DAYS = [
    -42, -38, -34, -30, -26, -22, -18, -14, -10, -7, -3,
]  # 11 visits over ~6 weeks, spacing ~3-4 days


def generate_historical() -> dict[str, int]:
    counts: dict[str, int] = {}
    sensors = sensor_lookup()
    hanko_sensors = {sid: s for sid, s in sensors.items()
                     if sid in {"MAC-HKO-COAST-01", "MAC-HKO-PORT-01",
                                "MAC-HKO-PORT-02", "MAC-HKO-PORT-03"}}

    ais_baseline: list[dict[str, Any]] = []
    mac_baseline_obs: list[MacObservation] = []

    rng = random.Random(4800)

    for v_idx, days_back in enumerate(HISTORICAL_VISIT_OFFSETS_DAYS):
        # Visit day starts at 04:30Z, returns ~08:30Z (similar shape to scenario day)
        depart = (WINDOW_OPEN + timedelta(days=days_back)).replace(
            hour=4, minute=30, second=0, microsecond=0)
        retn = depart + timedelta(hours=4, minutes=10)

        # Simplified clean loop (4 waypoints — keeps historical AIS small)
        visit_wp = [
            (depart,                                  59.8230, 22.9720),
            (depart + timedelta(minutes=60),          59.7850, 22.9950),
            (depart + timedelta(minutes=180),         59.7900, 22.9700),
            (retn,                                    59.8230, 22.9720),
        ]
        track = AisTrack(
            mmsi=230199540,
            waypoints=visit_wp,
            cadence_s=30.0,
            destination="FIHKO",
            nav_status=7,
            seed=4900 + v_idx,
        )
        ais_baseline.extend(emit_ais(track))

        # MAC observations per persistent device, with per-visit dropout
        for i, p in enumerate(TAHTI_PERSISTENT_MACS):
            # 1.00 persistence -> always present; lower values -> occasional dropout
            if rng.random() > p["persistence"]:
                continue
            em = MovingMacEmitter(
                mac=p["mac"],
                manufacturer=p["vendor"],
                waypoints=visit_wp,
                active_windows=[(depart - timedelta(minutes=10),
                                 retn + timedelta(minutes=10))],
                seed=5000 + v_idx * 10 + i,
            )
            mac_baseline_obs.extend(
                simulate_moving_mac(em, hanko_sensors, session_window_s=300.0))

    # Ensure each persistent device shows up in >= ceil(11/2) = 6 visits
    counts_per_mac: dict[str, set[str]] = {}
    for o in mac_baseline_obs:
        mac_key = o.mac.upper()
        counts_per_mac.setdefault(mac_key, set())
        if o.session_start is not None:
            counts_per_mac[mac_key].add(o.session_start.date().isoformat())
    for p in TAHTI_PERSISTENT_MACS:
        seen = len(counts_per_mac.get(p["mac"].upper(), set()))
        if seen < 6:
            raise AssertionError(
                f"Persistent TÄHTI MAC {p['mac']} only seen in {seen}/11 visits, expected >= 6")

    counts["ais_baseline.ndjson"] = write_ndjson(
        OUT_HISTORICAL / "ais_baseline.ndjson", ais_baseline,
        "s4-spoofed-ais-identity/ais_baseline")

    mac_nd = [m.to_ndjson() for m in mac_baseline_obs]
    counts["mac_baseline.ndjson"] = write_ndjson(
        OUT_HISTORICAL / "mac_baseline.ndjson", mac_nd,
        "s4-spoofed-ais-identity/mac_baseline")
    mac_rows = [m.to_csv_row() for m in mac_baseline_obs]
    counts["mac_baseline.csv"] = write_csv(
        OUT_HISTORICAL / "mac_baseline.csv", MAC_CSV_HEADER, mac_rows,
        "s4-spoofed-ais-identity/mac_baseline_sessions")
    return counts


# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def dir_size_bytes(p: Path) -> int:
    return sum(f.stat().st_size for f in p.rglob("*") if f.is_file())


def main() -> int:
    print("[S4] generating realtime layer ...")
    rt = generate_realtime()
    print("[S4] generating static layer ...")
    st = generate_static()
    print("[S4] generating historical layer ...")
    hi = generate_historical()

    print("\n===== Scenario 04 — Spoofed AIS Identity: generation summary =====")
    print("\n[realtime]")
    for k, v in rt.items():
        print(f"  {k:<30} rows={v:>8}")
    print("\n[static]")
    for k, v in st.items():
        print(f"  {k:<30} features={v:>5}")
    print("\n[historical]")
    for k, v in hi.items():
        print(f"  {k:<30} rows={v:>8}")

    rt_bytes = dir_size_bytes(OUT_REALTIME)
    st_bytes = dir_size_bytes(OUT_STATIC)
    hi_bytes = dir_size_bytes(OUT_HISTORICAL)
    print("\n[on disk]")
    print(f"  realtime   {rt_bytes:>12} bytes ({rt_bytes/1024/1024:.2f} MB)")
    print(f"  static     {st_bytes:>12} bytes ({st_bytes/1024:.2f} KB)")
    print(f"  historical {hi_bytes:>12} bytes ({hi_bytes/1024/1024:.2f} MB)")

    total_rt = sum(v for k, v in rt.items() if k.endswith((".ndjson", ".csv")))
    total_hi = sum(v for k, v in hi.items() if k.endswith((".ndjson", ".csv")))
    print(f"\n[totals] realtime rows={total_rt}  historical rows={total_hi}")
    print(f"[fingerprints] TÄHTI persistent MACs={len(TAHTI_PERSISTENT_MACS)}  "
          f"spoof MACs={len(SPOOF_MAC_FINGERPRINT)}  "
          f"SILAKKA MACs={len(SILAKKA_PERSISTENT_MACS)}")
    print("[done] All files written under scenarios/04-spoofed-ais-identity/data/")

    summary = {
        "scenario": "s4-spoofed-ais-identity",
        "realtime": rt,
        "static": st,
        "historical": hi,
        "bytes": {"realtime": rt_bytes, "static": st_bytes, "historical": hi_bytes},
        "fingerprints": {
            "tahti_persistent_macs": len(TAHTI_PERSISTENT_MACS),
            "spoof_fingerprint_macs": len(SPOOF_MAC_FINGERPRINT),
            "silakka_persistent_macs": len(SILAKKA_PERSISTENT_MACS),
        },
        "historical_visits": len(HISTORICAL_VISIT_OFFSETS_DAYS),
        "window": {
            "open_ts": WINDOW_OPEN.isoformat(),
            "close_ts": WINDOW_CLOSE.isoformat(),
        },
    }
    (SCENARIO_DIR / "data" / "_generation_summary.json").write_text(
        json.dumps(summary, indent=2), encoding="utf-8")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
