# Streaming replay

Replay the per-scenario NDJSON files under `scenarios/<id>/data/realtime/` into
either **stdout**, a **Fabric Eventstream custom HTTP endpoint**, or an
**Azure Event Hub** that is wired up as an Eventstream source.

The replay merges all selected NDJSON streams in event-time order and tags each
outgoing event with three control fields so a downstream Eventstream / KQL
pipeline can fan them out to the correct table.

## Replay model

Each NDJSON file's first line is a synthetic-data disclaimer:

```json
{"__meta__": "synthetic", "disclaimer": "...", "dataset": "...", "version": "1.0"}
```

The replay script skips any line starting with `{"__meta__"`. For every
remaining line it resolves the event time from the first available of:

1. `timestamp` (ISO 8601 UTC, e.g. `2025-03-18T05:30:00.000000Z`)
2. `processingTimestamp` (used by the MAC stream)
3. `ts_epoch_ms` (Unix epoch milliseconds)

All selected streams are merged and yielded in ascending event-time order.

### Scenario time → wall time

Pacing is controlled by `--pace`:

| Mode                       | Behaviour                                                                                       |
| -------------------------- | ----------------------------------------------------------------------------------------------- |
| `asfast` (default)         | Emit as fast as the sink accepts. Useful for backfill or bulk load.                             |
| `realtime`                 | The first event is anchored at wall-clock `now`; subsequent events are delayed so that `(event_ts - first_event_ts) == (wall_now - wall_start)`. |
| `accelerated <factor>`     | Same anchoring but scenario time is divided by `<factor>` (so `10` means 10x faster than realtime). |

Optional `--start` / `--end` (UTC ISO 8601) filter the event-time window
before pacing is applied.

Each emitted payload is the original event with these extra fields:

```json
{
  "...original fields...": "...",
  "_stream":   "ais",                       // source NDJSON basename
  "_scenario": "01-ais-dark-near-cable",    // scenario id
  "_emit_ts":  "2025-11-04T08:12:33.412005Z" // wall-clock UTC at emit
}
```

`_stream` is the routing key used by Eventstream / KQL to fan events into the
right table (one table per stream).

## Install

```powershell
pip install -r streaming\requirements.txt
```

For `--target stdout` the `azure-*` and `requests` packages are imported lazily,
so a bare Python install is enough to do a dry run.

## Examples

### 1. stdout dry run (no Azure deps required)

```powershell
python streaming\eventstream_replay.py `
  --scenario 01-ais-dark-near-cable `
  --target stdout `
  --stream ais `
  --pace asfast `
  --end 2025-03-18T05:35:00Z
```

### 2. Fabric Eventstream custom HTTP endpoint

Create a **Custom endpoint** source on your Fabric Eventstream and copy its
ingest URL (it embeds an authenticated token in the query string).

```powershell
$ENDPOINT = "https://<workspace>.<region>.eventstream.fabric.microsoft.com/..."

python streaming\eventstream_replay.py `
  --scenario 02-ship-to-ship-rendezvous `
  --target eventstream `
  --endpoint-url $ENDPOINT `
  --pace accelerated 10
```

The script POSTs one JSON event per request with
`Content-Type: application/json`. The custom endpoint accepts single events;
Eventstream batches them downstream.

### 3. Azure Event Hubs with Managed Identity

This is the recommended path for production-style replays — no SAS keys, no
secrets. Event Hubs is then attached as an Eventstream source.

```powershell
az login   # or rely on the managed identity on the host

python streaming\eventstream_replay.py `
  --scenario 03-loitering-critical-infra `
  --target eventhub `
  --namespace my-ns.servicebus.windows.net `
  --hub r-mac-events `
  --pace realtime `
  --loop
```

**Required RBAC:** the identity running the script needs **`Azure Event Hubs
Data Sender`** on the namespace (or on the specific hub). No SAS keys are
supported by this script — it uses `DefaultAzureCredential` from
`azure-identity`, which picks up (in order) environment variables, a managed
identity, Azure CLI login, etc.

## Wiring into Fabric Eventstream

You have two clean options:

1. **Custom endpoint source (HTTP)** — easiest. Create an Eventstream, add a
   `Custom endpoint` source, copy its ingest URL, pass it to
   `--target eventstream --endpoint-url ...`. No Azure resources required
   outside Fabric.
2. **Event Hub source (MI)** — production-friendly. Provision an Event Hubs
   namespace + hub, grant the replay identity `Azure Event Hubs Data Sender`,
   then add an `Azure Event Hubs` source to the Eventstream that points at the
   same hub. Run the replay with `--target eventhub ...`.

In both cases, route each event in Eventstream by the `_stream` field
(e.g. `where _stream == "ais"`) to its destination KQL table.

> **Repo policy:** managed identity / AAD only. Do not introduce SAS-key
> connection strings into this repo or its docs.

## Sample KQL `.create table` DDL

The DDL below mirrors the field names found in `scenarios/*/data/realtime/*.ndjson`.
All tables additionally carry the three replay-injected columns: `_stream`,
`_scenario`, `_emit_ts`.

> Note: AIS, plane-radar, drone-radar and coastal-radar all carry both
> `timestamp` (ISO 8601) and `ts_epoch_ms` — keep both for convenience.

```kusto
.create table ais (
    timestamp:        datetime,
    ts_epoch_ms:      long,
    mmsi:             long,
    imo:              long,
    name:             string,
    callsign:         string,
    ship_type:        int,
    loa_m:            real,
    beam_m:           real,
    lat:              real,
    lon:              real,
    sog_kn:           real,
    cog_deg:          real,
    heading_deg:      real,
    nav_status:       int,
    destination:      string,
    msg_class:        string,
    source_receiver:  string,
    _stream:          string,
    _scenario:        string,
    _emit_ts:         datetime
)

.create table mac (
    sessionStart:           datetime,
    messageCount:           long,
    onlineDurationSeconds:  long,
    sessionEnd:             datetime,
    processingTimestamp:    datetime,
    deviceId:               string,
    version:                string,
    macAddress:             string,
    averageSignalStrength:  real,
    deviceManufacturer:     string,
    ingestion_ts:           long,
    status:                 string,
    _stream:                string,
    _scenario:              string,
    _emit_ts:               datetime
)

.create table plane_radar (
    timestamp:        datetime,
    ts_epoch_ms:      long,
    track_id:         string,
    sensor_id:        string,
    lat:              real,
    lon:              real,
    sog_kn:           real,
    cog_deg:          real,
    rcs_m2:           real,
    classification:   string,
    confidence:       real,
    mmsi_hint:        long,
    platform:         string,
    _stream:          string,
    _scenario:        string,
    _emit_ts:         datetime
)

.create table drone_radar (
    timestamp:        datetime,
    ts_epoch_ms:      long,
    track_id:         string,
    sensor_id:        string,
    lat:              real,
    lon:              real,
    alt_m:            real,
    speed_mps:        real,
    heading_deg:      real,
    rcs_m2:           real,
    classification:   string,
    confidence:       real,
    mmsi_hint:        long,
    platform:         string,
    _stream:          string,
    _scenario:        string,
    _emit_ts:         datetime
)

.create table coastal_radar (
    timestamp:        datetime,
    ts_epoch_ms:      long,
    track_id:         string,
    sensor_id:        string,
    lat:              real,
    lon:              real,
    sog_kn:           real,
    cog_deg:          real,
    rcs_m2:           real,
    classification:   string,
    confidence:       real,
    mmsi_hint:        long,
    range_nm:         real,
    _stream:          string,
    _scenario:        string,
    _emit_ts:         datetime
)
```

The MAC table's columns are intentionally identical to the canonical CSV
header used elsewhere in the repo:

```
sessionStart, messageCount, onlineDurationSeconds, sessionEnd,
processingTimestamp, deviceId, version, macAddress, averageSignalStrength,
deviceManufacturer, ingestion_ts, status
```

### Per-table routing

In Eventstream you can split the single inbound stream into one operator per
table using `_stream`:

```text
filter:  where _stream == "ais"            -> destination KQL table "ais"
filter:  where _stream == "mac"            -> destination KQL table "mac"
filter:  where _stream == "plane_radar"    -> destination KQL table "plane_radar"
filter:  where _stream == "drone_radar"    -> destination KQL table "drone_radar"
filter:  where _stream == "coastal_radar"  -> destination KQL table "coastal_radar"
```

If you'd rather land everything first and split in KQL, the same `_stream`
field works inside an `.update policy` or a materialised view.
