Real-Time NEM Data
Overview
Section titled “Overview”Real-time NEM data is the core differentiator of Energy Copilot. The platform achieves sub-5-minute latency from AEMO publishing new dispatch data to it appearing in the React dashboards — enabled by a dedicated fast-path pipeline (13_nemweb_bronze_to_gold.py) and the Lakebase serving layer.
Pipeline 13 Architecture
Section titled “Pipeline 13 Architecture”pipelines/13_nemweb_bronze_to_gold.py is the dedicated high-frequency NEMWEB pipeline. It runs every 5 minutes and is optimised purely for latency:
AEMO NEMWEB (5-min file release) │ ▼ (nemweb_downloader.py — exponential backoff) Download DISPATCH_UNIT_SCADA, DISPATCHPRICE, DISPATCHREGIONSUM │ ▼ (Bronze — append raw CSV rows)bronze.raw_dispatch_scada (interval_datetime, duid, scadavalue)bronze.raw_dispatch_regionsum (interval_datetime, regionid, totaldemand, rrp) │ ▼ (Watermark-based incremental load) Only process rows WHERE interval_datetime > last_processed_watermark │ ▼ (Gold — upsert via MERGE)gold.nem_prices_5mingold.nem_generation_by_fuel │ ▼ (Lakebase continuous sync)Postgres tables (10-38ms query latency)Total pipeline-to-dashboard latency: typically 2–4 minutes.
Watermark-Based Incremental Load
Section titled “Watermark-Based Incremental Load”The pipeline uses a watermark stored in gold.pipeline_watermarks to track the last processed interval:
# pipelines/13_nemweb_bronze_to_gold.py (simplified)from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
def get_watermark(pipeline_name: str) -> str: """Retrieve last processed interval_datetime for this pipeline.""" result = spark.sql(f""" SELECT last_processed_dt FROM energy_copilot.gold.pipeline_watermarks WHERE pipeline_name = '{pipeline_name}' """).collect() return result[0].last_processed_dt if result else "2024-01-01"
def update_watermark(pipeline_name: str, new_dt: str): spark.sql(f""" MERGE INTO energy_copilot.gold.pipeline_watermarks t USING (SELECT '{pipeline_name}' AS pipeline_name, '{new_dt}' AS last_processed_dt) s ON t.pipeline_name = s.pipeline_name WHEN MATCHED THEN UPDATE SET last_processed_dt = s.last_processed_dt WHEN NOT MATCHED THEN INSERT * """)
# Incremental loadwatermark = get_watermark("nemweb_bronze_to_gold")new_data = spark.sql(f""" SELECT * FROM energy_copilot.bronze.raw_dispatch_regionsum WHERE interval_datetime > '{watermark}' ORDER BY interval_datetime""")
if new_data.count() > 0: max_dt = new_data.agg({"interval_datetime": "max"}).collect()[0][0]
# Upsert to Gold new_data.createOrReplaceTempView("new_intervals") spark.sql(""" MERGE INTO energy_copilot.gold.nem_prices_5min t USING new_intervals s ON t.interval_datetime = s.interval_datetime AND t.region_id = s.regionid WHEN MATCHED THEN UPDATE SET t.rrp = s.rrp, t.demand = s.totaldemand WHEN NOT MATCHED THEN INSERT (interval_datetime, interval_date, region_id, rrp, demand) VALUES (s.interval_datetime, DATE(s.interval_datetime), s.regionid, s.rrp, s.totaldemand) """)
update_watermark("nemweb_bronze_to_gold", str(max_dt))Bronze Schema
Section titled “Bronze Schema”The Bronze layer stores raw NEMWEB CSV data with minimal transformation:
-- bronze.raw_dispatch_regionsum (mirrors NEMWEB DISPATCHREGIONSUM_D CSV)CREATE TABLE energy_copilot.bronze.raw_dispatch_regionsum ( record_type STRING COMMENT 'NEMWEB record type (D for data)', table_name STRING COMMENT 'NEMWEB table name', settlementdate TIMESTAMP COMMENT 'Settlement date and time (UTC+10, stored as-is)', regionid STRING COMMENT 'NEM region identifier', totaldemand DOUBLE COMMENT 'Scheduled demand (MW)', availablegeneration DOUBLE COMMENT 'Available generation capacity (MW)', availableload DOUBLE COMMENT 'Available load (MW)', rrp DOUBLE COMMENT 'Regional reference price ($/MWh)', rop DOUBLE COMMENT 'Regional override price ($/MWh)', raise6secrrp DOUBLE COMMENT 'FCAS raise 6-second price ($/MW/h)', lower6secrrp DOUBLE COMMENT 'FCAS lower 6-second price ($/MW/h)', -- ... all 40+ NEMWEB columns ingest_timestamp TIMESTAMP COMMENT 'When this row was loaded', ingest_date DATE COMMENT 'Partition key')USING DELTAPARTITIONED BY (ingest_date);NEMWEB File Naming Convention
Section titled “NEMWEB File Naming Convention”AEMO publishes NEMWEB files with a consistent naming convention:
PUBLIC_DISPATCHREGIONSUM_20250321053000_20250321060300.zip │ │ │ │ │ └── File published at (UTC) │ └── Dispatch interval (UTC) └── Table nameThe nemweb_downloader.py utility polls the NEMWEB index page and downloads new files as they appear, with exponential backoff for transient failures.
Anchor Timestamp in Market Briefs
Section titled “Anchor Timestamp in Market Briefs”The daily market brief uses an “anchor timestamp” to define the data window for summarisation:
# From pipelines/06_market_summary.pyfrom datetime import datetime, timedeltaimport pytz
AEST = pytz.timezone('Australia/Sydney')
def get_brief_anchor_window(): """ Anchor window: previous trading day Window: 04:00 AEST yesterday to 04:00 AEST today Rationale: all data from this window has been through NEMWEB processing and is available in gold tables by the 05:30 AEST scheduled run time. """ now_aest = datetime.now(AEST)
# Anchor: today 04:00 AEST end_dt = now_aest.replace(hour=4, minute=0, second=0, microsecond=0) # If it's before 04:00, we're still in yesterday's window if now_aest.hour < 4: end_dt -= timedelta(days=1)
start_dt = end_dt - timedelta(days=1)
return start_dt.astimezone(pytz.UTC), end_dt.astimezone(pytz.UTC)The 04:00 AEST boundary is chosen because:
- AEMO’s 30-minute settlement process means prices for interval 03:30 AEST are finalised by ~03:45 AEST
- The pipeline needs ~15 minutes to download, process, and load the data
- The brief generator runs at 05:30 AEST — the 04:00 anchor provides a 1.5-hour safety margin
Data Latency Summary
Section titled “Data Latency Summary”| Data Type | Source Interval | Bronze Latency | Gold Latency | Lakebase Latency |
|---|---|---|---|---|
| 5-min dispatch prices | 5 min | +1 min | +3 min | +5 min (sync delay) |
| 30-min trading prices | 30 min | +2 min | +5 min | +7 min |
| FCAS prices | 5 min | +1 min | +3 min | +5 min |
| Generation SCADA | 5 min | +1 min | +3 min | +5 min |
| BOM weather | 1 hour | +5 min | +10 min | N/A |
| Market brief | Daily | N/A | +10 min | N/A |