Skip to content

Real-Time NEM Data

Real-time NEM data is the core differentiator of Energy Copilot. The platform achieves sub-5-minute latency from AEMO publishing new dispatch data to it appearing in the React dashboards — enabled by a dedicated fast-path pipeline (13_nemweb_bronze_to_gold.py) and the Lakebase serving layer.

pipelines/13_nemweb_bronze_to_gold.py is the dedicated high-frequency NEMWEB pipeline. It runs every 5 minutes and is optimised purely for latency:

AEMO NEMWEB (5-min file release)
▼ (nemweb_downloader.py — exponential backoff)
Download DISPATCH_UNIT_SCADA, DISPATCHPRICE, DISPATCHREGIONSUM
▼ (Bronze — append raw CSV rows)
bronze.raw_dispatch_scada (interval_datetime, duid, scadavalue)
bronze.raw_dispatch_regionsum (interval_datetime, regionid, totaldemand, rrp)
▼ (Watermark-based incremental load)
Only process rows WHERE interval_datetime > last_processed_watermark
▼ (Gold — upsert via MERGE)
gold.nem_prices_5min
gold.nem_generation_by_fuel
▼ (Lakebase continuous sync)
Postgres tables (10-38ms query latency)

Total pipeline-to-dashboard latency: typically 2–4 minutes.

The pipeline uses a watermark stored in gold.pipeline_watermarks to track the last processed interval:

# pipelines/13_nemweb_bronze_to_gold.py (simplified)
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
def get_watermark(pipeline_name: str) -> str:
"""Retrieve last processed interval_datetime for this pipeline."""
result = spark.sql(f"""
SELECT last_processed_dt
FROM energy_copilot.gold.pipeline_watermarks
WHERE pipeline_name = '{pipeline_name}'
""").collect()
return result[0].last_processed_dt if result else "2024-01-01"
def update_watermark(pipeline_name: str, new_dt: str):
spark.sql(f"""
MERGE INTO energy_copilot.gold.pipeline_watermarks t
USING (SELECT '{pipeline_name}' AS pipeline_name, '{new_dt}' AS last_processed_dt) s
ON t.pipeline_name = s.pipeline_name
WHEN MATCHED THEN UPDATE SET last_processed_dt = s.last_processed_dt
WHEN NOT MATCHED THEN INSERT *
""")
# Incremental load
watermark = get_watermark("nemweb_bronze_to_gold")
new_data = spark.sql(f"""
SELECT * FROM energy_copilot.bronze.raw_dispatch_regionsum
WHERE interval_datetime > '{watermark}'
ORDER BY interval_datetime
""")
if new_data.count() > 0:
max_dt = new_data.agg({"interval_datetime": "max"}).collect()[0][0]
# Upsert to Gold
new_data.createOrReplaceTempView("new_intervals")
spark.sql("""
MERGE INTO energy_copilot.gold.nem_prices_5min t
USING new_intervals s
ON t.interval_datetime = s.interval_datetime AND t.region_id = s.regionid
WHEN MATCHED THEN UPDATE SET t.rrp = s.rrp, t.demand = s.totaldemand
WHEN NOT MATCHED THEN INSERT (interval_datetime, interval_date, region_id, rrp, demand)
VALUES (s.interval_datetime, DATE(s.interval_datetime), s.regionid, s.rrp, s.totaldemand)
""")
update_watermark("nemweb_bronze_to_gold", str(max_dt))

The Bronze layer stores raw NEMWEB CSV data with minimal transformation:

-- bronze.raw_dispatch_regionsum (mirrors NEMWEB DISPATCHREGIONSUM_D CSV)
CREATE TABLE energy_copilot.bronze.raw_dispatch_regionsum (
record_type STRING COMMENT 'NEMWEB record type (D for data)',
table_name STRING COMMENT 'NEMWEB table name',
settlementdate TIMESTAMP COMMENT 'Settlement date and time (UTC+10, stored as-is)',
regionid STRING COMMENT 'NEM region identifier',
totaldemand DOUBLE COMMENT 'Scheduled demand (MW)',
availablegeneration DOUBLE COMMENT 'Available generation capacity (MW)',
availableload DOUBLE COMMENT 'Available load (MW)',
rrp DOUBLE COMMENT 'Regional reference price ($/MWh)',
rop DOUBLE COMMENT 'Regional override price ($/MWh)',
raise6secrrp DOUBLE COMMENT 'FCAS raise 6-second price ($/MW/h)',
lower6secrrp DOUBLE COMMENT 'FCAS lower 6-second price ($/MW/h)',
-- ... all 40+ NEMWEB columns
ingest_timestamp TIMESTAMP COMMENT 'When this row was loaded',
ingest_date DATE COMMENT 'Partition key'
)
USING DELTA
PARTITIONED BY (ingest_date);

AEMO publishes NEMWEB files with a consistent naming convention:

PUBLIC_DISPATCHREGIONSUM_20250321053000_20250321060300.zip
│ │ │
│ │ └── File published at (UTC)
│ └── Dispatch interval (UTC)
└── Table name

The nemweb_downloader.py utility polls the NEMWEB index page and downloads new files as they appear, with exponential backoff for transient failures.

The daily market brief uses an “anchor timestamp” to define the data window for summarisation:

# From pipelines/06_market_summary.py
from datetime import datetime, timedelta
import pytz
AEST = pytz.timezone('Australia/Sydney')
def get_brief_anchor_window():
"""
Anchor window: previous trading day
Window: 04:00 AEST yesterday to 04:00 AEST today
Rationale: all data from this window has been through NEMWEB processing
and is available in gold tables by the 05:30 AEST scheduled run time.
"""
now_aest = datetime.now(AEST)
# Anchor: today 04:00 AEST
end_dt = now_aest.replace(hour=4, minute=0, second=0, microsecond=0)
# If it's before 04:00, we're still in yesterday's window
if now_aest.hour < 4:
end_dt -= timedelta(days=1)
start_dt = end_dt - timedelta(days=1)
return start_dt.astimezone(pytz.UTC), end_dt.astimezone(pytz.UTC)

The 04:00 AEST boundary is chosen because:

  1. AEMO’s 30-minute settlement process means prices for interval 03:30 AEST are finalised by ~03:45 AEST
  2. The pipeline needs ~15 minutes to download, process, and load the data
  3. The brief generator runs at 05:30 AEST — the 04:00 anchor provides a 1.5-hour safety margin
Data TypeSource IntervalBronze LatencyGold LatencyLakebase Latency
5-min dispatch prices5 min+1 min+3 min+5 min (sync delay)
30-min trading prices30 min+2 min+5 min+7 min
FCAS prices5 min+1 min+3 min+5 min
Generation SCADA5 min+1 min+3 min+5 min
BOM weather1 hour+5 min+10 minN/A
Market briefDailyN/A+10 minN/A