diff --git a/README.md b/README.md index 549da21..df54b79 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,74 @@ The webserver provides an intuitive interface with four main pages: 4. **Webserver** → serves UI and provides API access to database 5. **Grafana** → visualizes real-time data from database with embedded dashboards +## Archive Data + +The database can be initialized with archive CML data using two methods: + +### Method 1: CSV Files (Default, Fast) + +Pre-generated CSV files included in the repository: +- **728 CML sublinks** (364 unique CML IDs) covering Berlin area +- **~1.5M data rows** at 5-minute intervals over 7 days +- **Gzip-compressed** (~7.6 MB total, included in repo) +- **Loads in ~3 seconds** via PostgreSQL COPY + +Files are located in `/database/archive_data/` and loaded automatically on first database startup. + +### Method 2: Load from NetCDF (For Larger/Higher Resolution Archives) + +Load data directly from the full 3-month NetCDF archive with configurable time range: + +#### Default: 7 Days at 10-Second Resolution (~44M rows, ~5 minutes) + +```sh +# Rebuild parser if needed +docker compose build parser + +# Start database +docker compose up -d database + +# Load last 7 days from NetCDF +docker compose run --rm -e DB_HOST=database parser python /app/parser/parse_netcdf_archive.py +``` + +#### Custom Time Range + +Use `ARCHIVE_MAX_DAYS` to control how much data to load: + +```sh +# Load last 14 days (~88M rows, ~10 minutes) +docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=14 parser python /app/parser/parse_netcdf_archive.py + +# Load full 3 months (~579M rows, ~1 hour) +docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=0 parser python /app/parser/parse_netcdf_archive.py +``` + +**Note**: Set `ARCHIVE_MAX_DAYS=0` to disable the time limit and load the entire dataset. Larger datasets require more database memory (recommend at least 4GB RAM for full 3-month archive). + +**Features**: +- Auto-downloads 3-month NetCDF file (~209 MB) on first run +- **10-second resolution** (vs 5-minute for CSV method) +- **Automatic timestamp shifting** - data ends at current time +- **Progress reporting** with batch-by-batch status (~155K rows/sec) +- PostgreSQL COPY for maximum performance +- Configurable time window to balance demo realism vs load time + +The NetCDF file is downloaded to `parser/example_data/openMRG_cmls_20150827_3months.nc` and gitignored. + +### Managing Archive Data + +To regenerate CSV archive data: +```sh +python mno_data_source_simulator/generate_archive.py +``` + +To reload archive data (either method): +```sh +docker compose down -v # Remove volumes +docker compose up -d # Restart with fresh database +``` + ## Storage Backend The webserver supports multiple storage backends for received files: diff --git a/database/Dockerfile b/database/Dockerfile index c7fc3b3..531c747 100644 --- a/database/Dockerfile +++ b/database/Dockerfile @@ -6,6 +6,7 @@ ENV POSTGRES_USER=myuser ENV POSTGRES_PASSWORD=mypassword # Copy the initialization script to set up the database schema -COPY init.sql /docker-entrypoint-initdb.d/ +# Files are executed in alphabetical order, so we prefix with numbers +COPY init.sql /docker-entrypoint-initdb.d/01-init-schema.sql EXPOSE 5432 \ No newline at end of file diff --git a/database/archive_data/data_archive.csv.gz b/database/archive_data/data_archive.csv.gz new file mode 100644 index 0000000..b7740bc Binary files /dev/null and b/database/archive_data/data_archive.csv.gz differ diff --git a/database/archive_data/metadata_archive.csv.gz b/database/archive_data/metadata_archive.csv.gz new file mode 100644 index 0000000..b1fc959 Binary files /dev/null and b/database/archive_data/metadata_archive.csv.gz differ diff --git a/database/init_archive_data.sh b/database/init_archive_data.sh new file mode 100755 index 0000000..66d1090 --- /dev/null +++ b/database/init_archive_data.sh @@ -0,0 +1,52 @@ +#!/bin/bash +set -e + +# This script loads archive data into the database during initialization +# It runs after the schema is created (init.sql) but before the database +# accepts external connections. + +echo "Loading archive data into database..." + +ARCHIVE_DATA_DIR="/docker-entrypoint-initdb.d/archive_data" + +# Check if archive data exists (should be included in the repo) +if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv.gz" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv.gz" ]; then + echo "Warning: Archive data files not found. Skipping archive data load." + echo "Hint: Run 'python mno_data_source_simulator/generate_archive.py' to generate archive data." + exit 0 +fi + +# Load metadata first (required for foreign key references) +echo "Loading metadata archive..." +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + \COPY cml_metadata FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/metadata_archive.csv.gz' WITH (FORMAT csv, HEADER true); +EOSQL + +METADATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_metadata;") +echo "Loaded $METADATA_COUNT metadata records" + +# Load time-series data using COPY for maximum speed +echo "Loading time-series archive data (this may take 10-30 seconds)..." +START_TIME=$(date +%s) + +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + \COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/data_archive.csv.gz' WITH (FORMAT csv, HEADER true); +EOSQL + +END_TIME=$(date +%s) +DURATION=$((END_TIME - START_TIME)) + +DATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_data;") +echo "Loaded $DATA_COUNT data records in $DURATION seconds" + +# Display time range of loaded data +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + SELECT + 'Archive data time range:' as info, + MIN(time) as start_time, + MAX(time) as end_time, + COUNT(*) as total_rows + FROM cml_data; +EOSQL + +echo "Archive data successfully loaded!" diff --git a/docker-compose.yml b/docker-compose.yml index cebdecc..4c0fc48 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,9 @@ services: build: ./database ports: - "5432:5432" + volumes: + - ./database/archive_data:/docker-entrypoint-initdb.d/archive_data:ro + - ./database/init_archive_data.sh:/docker-entrypoint-initdb.d/99-load-archive.sh:ro processor: build: ./processor diff --git a/grafana/provisioning/dashboards/dashboards.yml b/grafana/provisioning/dashboards/dashboards.yml index 5d18d61..0383e26 100644 --- a/grafana/provisioning/dashboards/dashboards.yml +++ b/grafana/provisioning/dashboards/dashboards.yml @@ -6,6 +6,8 @@ providers: folder: '' type: file disableDeletion: false - editable: true + editable: false + updateIntervalSeconds: 10 + allowUiUpdates: false options: path: /etc/grafana/provisioning/dashboards/definitions diff --git a/grafana/provisioning/dashboards/definitions/cml-realtime.json b/grafana/provisioning/dashboards/definitions/cml-realtime.json index 9825df8..fb3a8f4 100644 --- a/grafana/provisioning/dashboards/definitions/cml-realtime.json +++ b/grafana/provisioning/dashboards/definitions/cml-realtime.json @@ -15,7 +15,7 @@ } ] }, - "editable": true, + "editable": false, "fiscalYearStartMonth": 0, "graphTooltip": 0, "id": 1, @@ -156,7 +156,7 @@ }, "format": "time_series", "rawQuery": true, - "rawSql": "SELECT\n time AS \"time\",\n sublink_id AS \"metric\",\n rsl AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nORDER BY \"time\" ASC", + "rawSql": "SELECT\n time AS \"time\",\n sublink_id AS \"metric\",\n rsl AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND '${aggregation}' = 'RAW'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nUNION ALL\nSELECT\n time_bucket((CASE WHEN '${interval}' = 'auto' THEN (${__interval_ms} || ' milliseconds') ELSE '${interval}' END)::interval, time) AS \"time\",\n sublink_id AS \"metric\",\n CASE\n WHEN '${aggregation}' = 'MEDIAN' THEN PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY rsl)\n WHEN '${aggregation}' = 'AVG' THEN AVG(rsl)\n WHEN '${aggregation}' = 'MIN' THEN MIN(rsl)\n WHEN '${aggregation}' = 'MAX' THEN MAX(rsl)\n WHEN '${aggregation}' = 'STDDEV' THEN STDDEV(rsl)\n END AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND '${aggregation}' <> 'RAW'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nGROUP BY time_bucket((CASE WHEN '${interval}' = 'auto' THEN (${__interval_ms} || ' milliseconds') ELSE '${interval}' END)::interval, time), sublink_id\nORDER BY \"time\" ASC", "refId": "A" } ], @@ -244,7 +244,7 @@ }, "format": "time_series", "rawQuery": true, - "rawSql": "SELECT\n time AS \"time\",\n sublink_id AS \"metric\",\n tsl AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nORDER BY \"time\" ASC", + "rawSql": "SELECT\n time AS \"time\",\n sublink_id AS \"metric\",\n tsl AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND '${aggregation}' = 'RAW'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nUNION ALL\nSELECT\n time_bucket((CASE WHEN '${interval}' = 'auto' THEN (${__interval_ms} || ' milliseconds') ELSE '${interval}' END)::interval, time) AS \"time\",\n sublink_id AS \"metric\",\n CASE\n WHEN '${aggregation}' = 'MEDIAN' THEN PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tsl)\n WHEN '${aggregation}' = 'AVG' THEN AVG(tsl)\n WHEN '${aggregation}' = 'MIN' THEN MIN(tsl)\n WHEN '${aggregation}' = 'MAX' THEN MAX(tsl)\n WHEN '${aggregation}' = 'STDDEV' THEN STDDEV(tsl)\n END AS \"value\"\nFROM cml_data\nWHERE cml_id = '${cml_id}'\n AND '${aggregation}' <> 'RAW'\n AND time >= $__timeFrom()::timestamptz\n AND time <= $__timeTo()::timestamptz\nGROUP BY time_bucket((CASE WHEN '${interval}' = 'auto' THEN (${__interval_ms} || ' milliseconds') ELSE '${interval}' END)::interval, time), sublink_id\nORDER BY \"time\" ASC", "refId": "A" } ], @@ -392,18 +392,121 @@ "query": "SELECT DISTINCT cml_id::text FROM cml_metadata ORDER BY cml_id", "refresh": 1, "regex": "", - "skipUrlSync": false, + "skipUrlSync": true, "sort": 0, "tagValuesQuery": "", "tagsQuery": "", "type": "query", "useTags": false + }, + { + "current": { + "selected": true, + "text": "1min", + "value": "1 minute" + }, + "description": "Time aggregation interval for downsampling data", + "hide": 0, + "includeAll": false, + "label": "Interval", + "multi": false, + "name": "interval", + "options": [ + { + "selected": false, + "text": "Auto", + "value": "auto" + }, + { + "selected": true, + "text": "1min", + "value": "1 minute" + }, + { + "selected": false, + "text": "5min", + "value": "5 minutes" + }, + { + "selected": false, + "text": "15min", + "value": "15 minutes" + }, + { + "selected": false, + "text": "1h", + "value": "1 hour" + }, + { + "selected": false, + "text": "6h", + "value": "6 hours" + }, + { + "selected": false, + "text": "1d", + "value": "1 day" + } + ], + "query": "Auto : auto,1min : 1 minute,5min : 5 minutes,15min : 15 minutes,1h : 1 hour,6h : 6 hours,1d : 1 day", + "queryValue": "", + "skipUrlSync": true, + "type": "custom" + }, + { + "current": { + "selected": true, + "text": "Mean", + "value": "AVG" + }, + "description": "Aggregation function for downsampling", + "hide": 0, + "includeAll": false, + "label": "Aggregation", + "multi": false, + "name": "aggregation", + "options": [ + { + "selected": true, + "text": "Mean", + "value": "AVG" + }, + { + "selected": false, + "text": "Raw (no aggregation)", + "value": "RAW" + }, + { + "selected": false, + "text": "Min", + "value": "MIN" + }, + { + "selected": false, + "text": "Max", + "value": "MAX" + }, + { + "selected": false, + "text": "Median", + "value": "MEDIAN" + }, + { + "selected": false, + "text": "StdDev", + "value": "STDDEV" + } + ], + "query": "Mean : AVG,Raw (no aggregation) : RAW,Min : MIN,Max : MAX,Median : MEDIAN,StdDev : STDDEV", + "queryValue": "", + "skipUrlSync": true, + "type": "custom" } ] }, "time": { - "from": "2015-08-27T00:00:00Z", - "to": "2015-08-27T00:30:00Z" + "from": "now-7d", + "to": "now" }, "timepicker": {}, "timezone": "", diff --git a/mno_data_source_simulator/generate_archive.py b/mno_data_source_simulator/generate_archive.py new file mode 100755 index 0000000..bad2ac8 --- /dev/null +++ b/mno_data_source_simulator/generate_archive.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +Generate archive CML data for database initialization. + +This script uses the existing CMLDataGenerator to create archive data +with real RSL/TSL values from the NetCDF file, but with fake timestamps +spanning the configured archive period. +""" + +import sys +import gzip +from pathlib import Path +from datetime import datetime, timedelta +import logging +import pandas as pd + +from data_generator import CMLDataGenerator + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Configuration +NETCDF_FILE = "../parser/example_data/openMRG_cmls_20150827_12hours.nc" +ARCHIVE_DAYS = 7 # Archive period in days (reduced for demo purposes) +TIME_INTERVAL_MINUTES = 5 # Resample to 5-minute intervals (reduces data size) +ARCHIVE_END_DATE = datetime.now() +ARCHIVE_START_DATE = ARCHIVE_END_DATE - timedelta(days=ARCHIVE_DAYS) +OUTPUT_DIR = "../database/archive_data" + +# Output files (gzipped) +METADATA_OUTPUT = "metadata_archive.csv.gz" +DATA_OUTPUT = "data_archive.csv.gz" + + +def generate_archive_data(): + """Generate archive metadata and time-series data.""" + + netcdf_path = Path(__file__).parent / NETCDF_FILE + output_path = Path(__file__).parent / OUTPUT_DIR + + if not netcdf_path.exists(): + logger.error(f"NetCDF file not found: {netcdf_path}") + sys.exit(1) + + output_path.mkdir(parents=True, exist_ok=True) + + logger.info("=" * 60) + logger.info("Generating Archive Data from NetCDF") + logger.info("=" * 60) + logger.info(f"NetCDF file: {netcdf_path}") + logger.info( + f"Archive period: {ARCHIVE_START_DATE} to {ARCHIVE_END_DATE} ({ARCHIVE_DAYS} days)" + ) + + # Initialize the data generator + generator = CMLDataGenerator( + netcdf_file=str(netcdf_path), + loop_duration_seconds=ARCHIVE_DAYS * 24 * 3600, # Loop over archive period + ) + + # Generate and save metadata using existing function + logger.info("\nGenerating metadata...") + metadata_path = output_path / METADATA_OUTPUT + metadata_df = generator.get_metadata_dataframe() + + with gzip.open(metadata_path, "wt") as f: + metadata_df.to_csv(f, index=False) + + logger.info(f"Saved {len(metadata_df)} metadata rows to {metadata_path}") + logger.info(f" Unique CML IDs: {metadata_df['cml_id'].nunique()}") + + # Generate timestamps for the archive period with configured interval + logger.info(f"\nGenerating time-series data...") + logger.info(f" Time interval: {TIME_INTERVAL_MINUTES} minutes") + + timestamps = pd.date_range( + start=ARCHIVE_START_DATE, + end=ARCHIVE_END_DATE, + freq=f"{TIME_INTERVAL_MINUTES}min", + ) + + logger.info(f" Total timestamps: {len(timestamps):,}") + logger.info(f" Total rows (estimate): {len(timestamps) * len(metadata_df):,}") + + # Set the generator's loop start time to archive start + generator.loop_start_time = ARCHIVE_START_DATE + + # Generate data in batches using existing generate_data function + batch_size = 100 + total_rows = 0 + data_path = output_path / DATA_OUTPUT + + with gzip.open(data_path, "wt") as f: + first_batch = True + + for i in range(0, len(timestamps), batch_size): + batch_timestamps = timestamps[i : i + batch_size] + + # Use existing generate_data function + df = generator.generate_data(batch_timestamps) + + # Write to gzipped CSV + df.to_csv(f, index=False, header=first_batch) + first_batch = False + + total_rows += len(df) + + # Progress indicator every 10% + if (i + batch_size) % (len(timestamps) // 10) < batch_size: + progress = min(100, ((i + batch_size) / len(timestamps)) * 100) + logger.info(f" Progress: {progress:.0f}% ({total_rows:,} rows)") + + logger.info(f"\nSaved {total_rows:,} data rows to {data_path}") + + # Report file sizes + metadata_size = metadata_path.stat().st_size / 1024 + data_size = data_path.stat().st_size / (1024 * 1024) + logger.info(f"\nFile sizes:") + logger.info(f" {metadata_path.name}: {metadata_size:.1f} KB") + logger.info(f" {data_path.name}: {data_size:.1f} MB") + + logger.info("\n" + "=" * 60) + logger.info("Archive data generation complete!") + logger.info("=" * 60) + + generator.close() + + +if __name__ == "__main__": + generate_archive_data() diff --git a/mno_data_source_simulator/tests/test_generate_archive.py b/mno_data_source_simulator/tests/test_generate_archive.py new file mode 100644 index 0000000..fac2d9e --- /dev/null +++ b/mno_data_source_simulator/tests/test_generate_archive.py @@ -0,0 +1,56 @@ +import pytest +from pathlib import Path +from unittest.mock import patch, MagicMock, mock_open +import pandas as pd +import sys + +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +@patch("generate_archive.Path.mkdir") +@patch("generate_archive.Path.exists") +@patch("generate_archive.CMLDataGenerator") +@patch("generate_archive.gzip.open", new_callable=mock_open) +def test_generate_archive_creates_gzipped_files( + mock_gzip, mock_generator_class, mock_exists, mock_mkdir +): + """Test generate_archive_data() creates compressed metadata and data files.""" + from generate_archive import generate_archive_data + + mock_exists.return_value = True + + mock_generator = MagicMock() + mock_generator_class.return_value = mock_generator + mock_generator.get_metadata_dataframe.return_value = pd.DataFrame( + { + "cml_id": ["101", "102"], + "sublink_id": ["sublink_1", "sublink_1"], + } + ) + mock_generator.generate_data.return_value = pd.DataFrame( + { + "time": pd.date_range("2024-01-01", periods=2, freq="5min"), + "cml_id": ["101", "101"], + "tsl": [50.0, 51.0], + "rsl": [-60.0, -61.0], + } + ) + + with patch("generate_archive.Path.stat") as mock_stat: + mock_stat.return_value.st_size = 1024 + generate_archive_data() + + # Verify gzipped files created (critical for demo setup) + assert mock_gzip.call_count == 2 + mock_generator.close.assert_called_once() + + +@patch("generate_archive.Path.exists") +def test_generate_archive_fails_if_netcdf_missing(mock_exists): + """Test generate_archive_data() fails when NetCDF file missing.""" + from generate_archive import generate_archive_data + + mock_exists.return_value = False + + with pytest.raises(SystemExit): + generate_archive_data() diff --git a/parser/example_data/.gitignore b/parser/example_data/.gitignore new file mode 100644 index 0000000..92f2938 --- /dev/null +++ b/parser/example_data/.gitignore @@ -0,0 +1,2 @@ +# Ignore the full 3-month dataset (200 MB, downloaded on demand) +openMRG_cmls_20150827_3months.nc diff --git a/parser/example_data/download_full_dataset.sh b/parser/example_data/download_full_dataset.sh new file mode 100755 index 0000000..b5298f8 --- /dev/null +++ b/parser/example_data/download_full_dataset.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# Download the full 3-month CML dataset (200 MB compressed) + +set -e + +URL="https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download" +OUTPUT_FILE="openMRG_cmls_20150827_3months.nc" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +OUTPUT_PATH="$SCRIPT_DIR/$OUTPUT_FILE" + +echo "Downloading full 3-month CML dataset..." +echo "URL: $URL" +echo "Output: $OUTPUT_PATH" + +if [ -f "$OUTPUT_PATH" ]; then + echo "File already exists: $OUTPUT_PATH" + read -p "Overwrite? (y/N): " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Download cancelled." + exit 0 + fi +fi + +curl -L "$URL" -o "$OUTPUT_PATH" --progress-bar + +if [ $? -eq 0 ]; then + echo "Download complete!" + echo "File size: $(du -h "$OUTPUT_PATH" | cut -f1)" +else + echo "Download failed!" + exit 1 +fi diff --git a/parser/parse_netcdf_archive.py b/parser/parse_netcdf_archive.py new file mode 100644 index 0000000..4ade24a --- /dev/null +++ b/parser/parse_netcdf_archive.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python3 +""" +Load archive CML data directly from NetCDF to database. + +This script reads a NetCDF file, shifts timestamps to end at current time, +and loads data directly into the database using PostgreSQL COPY FROM. +This is optimized for large datasets (millions of rows). + +The script preserves the original temporal resolution and time span of the NetCDF file. +""" + +import os +import sys +import io +from datetime import datetime, timedelta +import logging +from pathlib import Path + +import xarray as xr +import pandas as pd +import numpy as np +import psycopg2 + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Paths - use parser's own directory structure +SCRIPT_DIR = Path(__file__).parent +EXAMPLE_DATA_DIR = SCRIPT_DIR / "example_data" +EXAMPLE_DATA_DIR.mkdir(parents=True, exist_ok=True) + +# Configuration from environment variables +NETCDF_FILE = os.getenv( + "ARCHIVE_NETCDF_FILE", str(EXAMPLE_DATA_DIR / "openMRG_cmls_20150827_3months.nc") +) +NETCDF_URL = os.getenv( + "ARCHIVE_NETCDF_URL", "https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download" +) + +# Limit time range (in days from end) - set to None for full dataset +# For demo purposes, default to 7 days to avoid overwhelming the database +MAX_DAYS = int(os.getenv("ARCHIVE_MAX_DAYS", "7")) # Set to 0 for full dataset + +# Database connection from environment +DB_NAME = os.getenv("POSTGRES_DB", "mydatabase") +DB_USER = os.getenv("POSTGRES_USER", "myuser") +DB_PASSWORD = os.getenv("POSTGRES_PASSWORD", "mypassword") +DB_HOST = os.getenv("DB_HOST", "localhost") +DB_PORT = os.getenv("DB_PORT", "5432") + +# Batch size for COPY operations (balance memory vs transaction size) +BATCH_SIZE = 1000 # timestamps per batch (1000 × 728 = 728K rows per batch) + + +def download_netcdf(url, output_path): + """Download NetCDF file if it doesn't exist.""" + if os.path.exists(output_path): + logger.info(f"NetCDF file already exists: {output_path}") + return + + logger.info(f"Downloading NetCDF file from {url}...") + + import urllib.request + + def progress_hook(count, block_size, total_size): + percent = int(count * block_size * 100 / total_size) + if count % 100 == 0: # Update every 100 blocks + logger.info(f" Download progress: {percent}%") + + try: + urllib.request.urlretrieve(url, output_path, reporthook=progress_hook) + logger.info("Download complete!") + except Exception as e: + logger.error(f"Download failed: {e}") + raise + + +def load_metadata_from_netcdf(ds): + """Extract CML metadata from NetCDF dataset.""" + logger.info("Extracting metadata from NetCDF...") + + # NetCDF dimensions: (sublink_id=2, cml_id=364, time=...) + # cml_id: (364,) - unique CML site IDs + # site coords: (364,) - one per CML + # frequency, polarization: (2, 364) - one per (sublink, cml) + + cml_ids = ds.cml_id.values # (364,) + site_0_lon = ds.site_0_lon.values # (364,) + site_0_lat = ds.site_0_lat.values # (364,) + site_1_lon = ds.site_1_lon.values # (364,) + site_1_lat = ds.site_1_lat.values # (364,) + frequency = ds.frequency.values # (2, 364) + polarization = ds.polarization.values # (2, 364) + + # Calculate link length using haversine formula + def haversine_distance(lon1, lat1, lon2, lat2): + R = 6371000 # Earth radius in meters + phi1, phi2 = np.radians(lat1), np.radians(lat2) + dphi = np.radians(lat2 - lat1) + dlambda = np.radians(lon2 - lon1) + a = ( + np.sin(dphi / 2) ** 2 + + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda / 2) ** 2 + ) + return 2 * R * np.arcsin(np.sqrt(a)) + + length = haversine_distance(site_0_lon, site_0_lat, site_1_lon, site_1_lat) + + # Create metadata records (728 total: 364 CMLs × 2 sublinks) + metadata_records = [] + + for cml_idx, cml_id in enumerate(cml_ids): + for sublink_idx in range(2): # 0 and 1 + sublink_id = f"sublink_{sublink_idx + 1}" + + metadata_records.append( + { + "cml_id": str(cml_id), + "sublink_id": sublink_id, + "site_0_lon": float(site_0_lon[cml_idx]), + "site_0_lat": float(site_0_lat[cml_idx]), + "site_1_lon": float(site_1_lon[cml_idx]), + "site_1_lat": float(site_1_lat[cml_idx]), + "frequency": float(frequency[sublink_idx, cml_idx]), + "polarization": str(polarization[sublink_idx, cml_idx]), + "length": float(length[cml_idx]), + } + ) + + metadata_df = pd.DataFrame(metadata_records) + logger.info( + f"Extracted {len(metadata_df)} metadata records ({metadata_df['cml_id'].nunique()} unique CML IDs)" + ) + + return metadata_df + + +def copy_dataframe_to_db(cursor, df, table_name, columns): + """Use PostgreSQL COPY FROM to efficiently load dataframe.""" + buffer = io.StringIO() + df.to_csv(buffer, index=False, header=False, na_rep="\\N") + buffer.seek(0) + + cursor.copy_from(buffer, table_name, sep=",", null="\\N", columns=columns) + + +def load_timeseries_from_netcdf(ds, metadata_df, cursor, conn): + """ + Load time-series data from NetCDF with shifted timestamps. + + Preserves original temporal resolution and shifts timestamps + so that the data ends at the current time. + """ + logger.info("Loading time-series data...") + + # Get original timestamps + original_times = pd.to_datetime(ds.time.values) + n_timestamps_full = len(original_times) + + # Limit to recent data if MAX_DAYS is set + if MAX_DAYS > 0: + # Calculate how many timestamps for MAX_DAYS + # Assuming 10-second resolution: 86400 / 10 = 8640 timestamps per day + timestamps_per_day = 8640 + max_timestamps = MAX_DAYS * timestamps_per_day + + if n_timestamps_full > max_timestamps: + start_idx = n_timestamps_full - max_timestamps + original_times = original_times[start_idx:] + logger.info( + f"Limiting to last {MAX_DAYS} days ({max_timestamps:,} timestamps)" + ) + else: + start_idx = 0 + else: + start_idx = 0 + + n_timestamps = len(original_times) + + # Calculate time shift to end at current time + current_time = pd.Timestamp.now() + time_shift = current_time - original_times[-1] + shifted_times = original_times + time_shift + + logger.info(f"Original time range: {original_times[0]} to {original_times[-1]}") + logger.info(f"Shifted time range: {shifted_times[0]} to {shifted_times[-1]}") + logger.info(f"Time shift applied: {time_shift}") + + # Get NetCDF dimensions + # NetCDF shape: (sublink_id=2, cml_id=364, time=794887) + # But we need (time, sublink, cml) for iteration + n_sublinks = ds.sizes["sublink_id"] # 2 + n_cmls = ds.sizes["cml_id"] # 364 + cml_ids_nc = ds.cml_id.values # (364,) + + # Build CML ID to DB mapping from metadata + # metadata_df has 728 rows (364 CMLs × 2 sublinks) + cml_to_dbid = {} # Maps (cml_id, sublink_id) -> (cml_id_str, sublink_id_str) + for _, row in metadata_df.iterrows(): + cml_to_dbid[(row["cml_id"], row["sublink_id"])] = ( + row["cml_id"], + row["sublink_id"], + ) + + # Calculate total rows (728 CMLs × timestamps) + total_cmls = len(metadata_df) + total_rows = n_timestamps * total_cmls + logger.info( + f"Total data points: {n_timestamps:,} timestamps × {total_cmls} CMLs = {total_rows:,} rows" + ) + logger.info(f"Processing in batches of {BATCH_SIZE:,} timestamps...") + + start_time = datetime.now() + rows_loaded = 0 + + # Process in batches to manage memory + for batch_num, batch_start_rel in enumerate(range(0, n_timestamps, BATCH_SIZE), 1): + batch_end_rel = min(batch_start_rel + BATCH_SIZE, n_timestamps) + batch_times = shifted_times[batch_start_rel:batch_end_rel] + batch_size_actual = batch_end_rel - batch_start_rel + + # Convert relative indices to absolute NetCDF indices + batch_start_abs = start_idx + batch_start_rel + batch_end_abs = start_idx + batch_end_rel + + # Load only this batch's data from NetCDF + # NetCDF data dimensions: (time, sublink_id, cml_id) = (794887, 2, 364) + tsl_batch = ds.tsl.isel( + time=slice(batch_start_abs, batch_end_abs) + ).values # (batch_size, 2, 364) + rsl_batch = ds.rsl.isel( + time=slice(batch_start_abs, batch_end_abs) + ).values # (batch_size, 2, 364) + + # Reshape data for database insertion + # Create arrays for each column + batch_rows = batch_size_actual * n_cmls * n_sublinks + + # Pre-allocate arrays + times_arr = np.empty(batch_rows, dtype="datetime64[ns]") + cml_ids_arr = np.empty(batch_rows, dtype=object) + sublink_ids_arr = np.empty(batch_rows, dtype=object) + tsl_arr = np.empty(batch_rows, dtype=float) + rsl_arr = np.empty(batch_rows, dtype=float) + + idx = 0 + for t_idx, timestamp in enumerate(batch_times): + for cml_idx, cml_id in enumerate(cml_ids_nc): + for sublink_idx in range(n_sublinks): + times_arr[idx] = timestamp + cml_ids_arr[idx] = str(cml_id) + sublink_ids_arr[idx] = f"sublink_{sublink_idx + 1}" + tsl_arr[idx] = tsl_batch[t_idx, sublink_idx, cml_idx] + rsl_arr[idx] = rsl_batch[t_idx, sublink_idx, cml_idx] + idx += 1 + + # Create DataFrame from arrays + batch_df = pd.DataFrame( + { + "time": times_arr, + "cml_id": cml_ids_arr, + "sublink_id": sublink_ids_arr, + "tsl": tsl_arr, + "rsl": rsl_arr, + } + ) + + # Load batch to database + copy_dataframe_to_db( + cursor, + batch_df, + "cml_data", + ["time", "cml_id", "sublink_id", "tsl", "rsl"], + ) + + rows_loaded += len(batch_df) + + # Log progress every batch + elapsed = (datetime.now() - start_time).total_seconds() + progress = (rows_loaded / total_rows) * 100 + rate = rows_loaded / elapsed if elapsed > 0 else 0 + + total_batches = (n_timestamps + BATCH_SIZE - 1) // BATCH_SIZE + logger.info( + f" Batch {batch_num}/{total_batches}: " + f"{progress:5.1f}% complete, {rate:,.0f} rows/sec" + ) + + # Commit periodically (every 10 batches) + if (batch_num % 10) == 0 or batch_end_rel == n_timestamps: + conn.commit() + logger.info(f" ✓ Committed to database") + + total_duration = (datetime.now() - start_time).total_seconds() + logger.info(f"Loaded {rows_loaded:,} data records in {total_duration:.0f} seconds") + + return rows_loaded + + +def main(): + """Main function to load archive data from NetCDF to database.""" + + logger.info("=" * 70) + logger.info("NetCDF to Database Archive Loader") + logger.info("=" * 70) + logger.info(f"NetCDF file: {NETCDF_FILE}") + + # Download NetCDF if needed + if not os.path.exists(NETCDF_FILE): + if NETCDF_URL: + download_netcdf(NETCDF_URL, NETCDF_FILE) + else: + logger.error( + f"NetCDF file not found and no download URL provided: {NETCDF_FILE}" + ) + sys.exit(1) + + # Open NetCDF dataset + logger.info("Opening NetCDF dataset...") + try: + ds = xr.open_dataset(NETCDF_FILE) + except Exception as e: + logger.error(f"Failed to open NetCDF file: {e}") + sys.exit(1) + + # Connect to database + logger.info("Connecting to database...") + try: + conn = psycopg2.connect( + dbname=DB_NAME, + user=DB_USER, + password=DB_PASSWORD, + host=DB_HOST, + port=DB_PORT, + ) + conn.autocommit = False + cursor = conn.cursor() + logger.info("Database connection established") + except Exception as e: + logger.error(f"Failed to connect to database: {e}") + ds.close() + sys.exit(1) + + try: + # Clear existing data before loading archive + logger.info("Clearing existing database data...") + cursor.execute("TRUNCATE TABLE cml_data") + cursor.execute("TRUNCATE TABLE cml_metadata") + conn.commit() + logger.info("Existing data cleared") + + # Load metadata + metadata_df = load_metadata_from_netcdf(ds) + + logger.info("Loading metadata to database...") + copy_dataframe_to_db( + cursor, + metadata_df, + "cml_metadata", + [ + "cml_id", + "sublink_id", + "site_0_lon", + "site_0_lat", + "site_1_lon", + "site_1_lat", + "frequency", + "polarization", + "length", + ], + ) + conn.commit() + logger.info(f"✓ Loaded {len(metadata_df)} metadata records") + + # Load time-series data + rows_loaded = load_timeseries_from_netcdf(ds, metadata_df, cursor, conn) + + # Verify loaded data + cursor.execute( + """ + SELECT + MIN(time) as start_time, + MAX(time) as end_time, + COUNT(*) as total_rows + FROM cml_data + """ + ) + result = cursor.fetchone() + + logger.info("=" * 70) + logger.info("Archive Data Successfully Loaded!") + logger.info("=" * 70) + logger.info(f"Time range: {result[0]} to {result[1]}") + logger.info(f"Total rows: {result[2]:,}") + logger.info("=" * 70) + + except Exception as e: + logger.error(f"Error during data loading: {e}") + conn.rollback() + raise + finally: + cursor.close() + conn.close() + ds.close() + + +if __name__ == "__main__": + main() diff --git a/parser/tests/test_parse_netcdf_archive.py b/parser/tests/test_parse_netcdf_archive.py new file mode 100644 index 0000000..e22e38c --- /dev/null +++ b/parser/tests/test_parse_netcdf_archive.py @@ -0,0 +1,59 @@ +import pytest +from unittest.mock import patch, MagicMock +import pandas as pd +import numpy as np + + +@patch("parser.parse_netcdf_archive.psycopg2.connect") +@patch("parser.parse_netcdf_archive.xr.open_dataset") +@patch("parser.parse_netcdf_archive.os.path.exists") +def test_main_clears_and_loads_data(mock_exists, mock_open_dataset, mock_connect): + """Test main() truncates tables and loads new archive data.""" + from parser.parse_netcdf_archive import main + + mock_exists.return_value = True + + # Mock minimal NetCDF dataset + mock_ds = MagicMock() + mock_ds.cml_id.values = np.array([101, 102]) + mock_ds.site_0_lon.values = np.array([10.0, 11.0]) + mock_ds.site_0_lat.values = np.array([50.0, 51.0]) + mock_ds.site_1_lon.values = np.array([10.1, 11.1]) + mock_ds.site_1_lat.values = np.array([50.1, 51.1]) + mock_ds.frequency.values = np.array([[20, 21], [22, 23]]) + mock_ds.polarization.values = np.array([["H", "V"], ["V", "H"]]) + mock_ds.time.values = pd.date_range("2024-01-01", periods=10, freq="10s") + mock_ds.sizes = {"sublink_id": 2, "cml_id": 2} + mock_ds.tsl.isel.return_value.values = np.random.rand(10, 2, 2) + mock_ds.rsl.isel.return_value.values = np.random.rand(10, 2, 2) + mock_open_dataset.return_value = mock_ds + + # Mock database + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = ( + pd.Timestamp("2024-01-01"), + pd.Timestamp("2024-01-02"), + 1000, + ) + mock_conn.cursor.return_value = mock_cursor + mock_connect.return_value = mock_conn + + main() + + # Verify truncate is called (critical for demo setup) + mock_cursor.execute.assert_any_call("TRUNCATE TABLE cml_data") + mock_cursor.execute.assert_any_call("TRUNCATE TABLE cml_metadata") + + +@patch("parser.parse_netcdf_archive.psycopg2.connect") +def test_main_fails_on_db_error(mock_connect): + """Test main() handles database connection errors.""" + from parser.parse_netcdf_archive import main + + mock_connect.side_effect = Exception("Connection refused") + + with patch("parser.parse_netcdf_archive.os.path.exists", return_value=True): + with patch("parser.parse_netcdf_archive.xr.open_dataset"): + with pytest.raises(SystemExit): + main()