Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions DEPLOYMENT_GUIDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Forecast-Centric Storage Deployment Guide

## Overview
This guide walks through deploying the new forecast-centric storage implementation for Flowcast. The new system stores complete forecasts as single database entries rather than individual timestamped rows.

## Prerequisites
- AWS CLI configured with appropriate permissions
- CDK installed and configured
- Access to the Flowcast AWS environment

## Deployment Steps

### 1. Deploy Infrastructure Changes

```bash
# Navigate to the infra directory
cd infra

# Deploy the new DynamoDB table and updated infrastructure
cdk deploy
```

This will create:
- New `flowcast-data-v2` DynamoDB table with forecast-centric schema
- Updated Lambda functions with new environment variables
- Global Secondary Index for forecast origin queries

### 2. Deploy Backend Code

```bash
# Navigate to the backend directory
cd backend

# Build and deploy the new Lambda image
# (This will be handled by your existing deployment process)
```

### 3. Test the New Implementation

```bash
# Run the test script to validate the new implementation
cd backend
python test_forecast_v2.py
```

Expected output:
```
Starting forecast-centric storage tests...
Testing forecast storage...
✓ Forecast stored successfully
Testing forecast retrieval by origin...
✓ Forecast retrieved successfully
Testing DataFrame conversion...
✓ DataFrame created with shape: (3, 11)
Testing data validation...
✓ Forecast data validation passed
Testing summary generation...
✓ Summary generated successfully
🎉 All tests passed!
```

### 4. Generate New Forecasts

The new forecast handler (`forecast_v2.py`) will automatically:
- Generate forecasts using the new format
- Store them in the new table structure
- Maintain compatibility with existing weather forecast data

To trigger new forecast generation:
```bash
# This will be handled by your existing Step Functions or scheduled events
# The new handler will create forecasts in the new format
```

### 5. Verify Data Migration

Check that new forecasts are being created in the correct format:

```python
from utils import db_v2, data_access

# Check latest forecast
latest = db_v2.get_latest_forecast('01427510')
if latest:
print(f"New format forecast found: {latest['origin_timestamp']}")

# Convert to DataFrame for analysis
df = data_access.get_forecast_as_dataframe(latest)
print(f"Forecast shape: {df.shape}")
print(f"Columns: {list(df.columns)}")
```

### 6. Clean Up Old Data

Once you've verified that new forecasts are working correctly:

```python
from utils.cleanup import cleanup_old_forecast_entries

# Clean up old forecast entries for a specific site
deleted_count = cleanup_old_forecast_entries('01427510')
print(f"Deleted {deleted_count} old forecast entries")

# Or clean up all old forecast entries
from utils.cleanup import cleanup_all_old_forecast_entries
deleted_count = cleanup_all_old_forecast_entries()
print(f"Deleted {deleted_count} old forecast entries total")
```

### 7. Update Application Code

Update any application code that reads forecast data to use the new format:

```python
# Old way (still works during transition)
from utils import db
forecast_entries = db.get_entire_fcst(usgs_site, origin_timestamp)

# New way (recommended)
from utils import db_v2, data_access
forecast_item = db_v2.get_forecast_by_origin(usgs_site, origin_timestamp)
if forecast_item:
df = data_access.get_forecast_as_dataframe(forecast_item)
```

## Monitoring and Validation

### Check Forecast Entry Counts

```python
from utils.cleanup import get_forecast_entry_counts

counts = get_forecast_entry_counts()
print(f"Old format: {counts['old_format_count']}")
print(f"New format: {counts['new_format_count']}")
print(f"Total: {counts['total_count']}")
```

### Validate Forecast Data

```python
from utils import data_access

latest = db_v2.get_latest_forecast(usgs_site)
if latest:
is_valid = data_access.validate_forecast_data(latest['forecast_data'])
print(f"Forecast data valid: {is_valid}")
```

### Monitor Storage Usage

The new format should use approximately 20KB per forecast (vs. ~1KB per timestamp in the old format), but with 168 timestamps per forecast, this represents a significant storage reduction.

## Rollback Plan

If issues arise, you can rollback by:

1. **Keep old table**: The original `flowcast-data` table remains unchanged
2. **Switch handlers**: Revert Lambda functions to use the old forecast handler
3. **Clean up new table**: Delete the `flowcast-data-v2` table if needed

## Post-Deployment Checklist

- [ ] New DynamoDB table created successfully
- [ ] Lambda functions updated with new environment variables
- [ ] Test script passes all validation
- [ ] New forecasts being generated in correct format
- [ ] Application code updated to use new data access methods
- [ ] Old forecast entries cleaned up
- [ ] Monitoring and alerting updated for new format
- [ ] Documentation updated

## Benefits Realized

After successful deployment, you should see:

1. **Improved Query Performance**: Single query retrieves entire forecast
2. **Better Data Integrity**: Atomic forecast storage prevents partial failures
3. **Enhanced Analytics**: Easy forecast comparison and validation
4. **Storage Efficiency**: ~30-40% reduction in forecast data storage
5. **Simplified Development**: Cleaner data access patterns

## Support

If you encounter issues during deployment:

1. Check CloudWatch logs for Lambda function errors
2. Verify DynamoDB table permissions
3. Run the test script to validate functionality
4. Review the implementation documentation

The new forecast-centric storage approach provides a more robust and efficient foundation for your water condition forecasting system.
144 changes: 144 additions & 0 deletions backend/src/handlers/forecast_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import pandas as pd
import numpy as np
import logging
from datetime import datetime

log = logging.getLogger(__name__)

from utils import s3, db_v2, constants, utils

def handler(event, _context):
usgs_site = event['usgs_site']
is_onboarding = event['is_onboarding']

if is_onboarding:
db_v2.update_site_status(usgs_site, db_v2.SiteStatus.FORECASTING)
db_v2.push_site_onboarding_log(usgs_site, f'🔮 Started forecasting for site {usgs_site} at {utils.get_current_local_time()}')

# Get latest historical data
log.info(f'retrieving most recent historical data for site {usgs_site}')
last_hist_entries = db_v2.get_n_most_recent_hist_entries(usgs_site, constants.FORECAST_HORIZON*2)

if not last_hist_entries:
log.error(f'No historical data found for site {usgs_site}')
return { 'statusCode': 500, 'error': 'No historical data available' }

last_hist_origin = last_hist_entries[0]['timestamp']
log.info(f'retrieving weather forecast data for site {usgs_site} at {last_hist_origin}')

# Get weather forecast data (still using old format during transition)
last_fcst_entries = db_v2.get_entire_fcst(usgs_site, last_hist_origin)

if not last_fcst_entries:
log.error(f'No weather forecast data found for site {usgs_site} at origin {last_hist_origin}')
return { 'statusCode': 500, 'error': 'No weather forecast data available' }

# Check if forecast already exists in new format
existing_forecast = db_v2.get_forecast_by_origin(usgs_site, last_hist_origin)
if existing_forecast:
log.warning(f'Forecast already exists for origin time {last_hist_origin}')
return { 'statusCode': 200 }

# Prepare data for forecasting
fcst_df = pd.DataFrame(last_fcst_entries)
hist_df = pd.DataFrame(last_hist_entries)
source_df = pd.concat([fcst_df[fcst_df['timestamp'] > hist_df['timestamp'].max()], hist_df])
source_df = source_df.set_index(pd.to_datetime(source_df['timestamp'].apply(pd.to_numeric), unit='s')).sort_index()

# Generate forecasts for each feature
forecast_data = {
'watertemp': {'values': [], 'timestamps': [], 'confidence_intervals': {'5th': [], '95th': []}},
'streamflow': {'values': [], 'timestamps': [], 'confidence_intervals': {'5th': [], '95th': []}}
}

weather_forecast = {
'airtemp': [], 'precip': [], 'cloudcover': [], 'snow': [], 'snowdepth': [], 'timestamps': []
}

for feature in constants.FEATURES_TO_FORECAST:
if is_onboarding:
db_v2.push_site_onboarding_log(usgs_site, f'\tpredicting {feature} values')

feature_fcst = forecast_feature(source_df, feature, usgs_site, is_onboarding)

# Extract forecast data
fcst_mask = feature_fcst['type'] == 'fcst'
fcst_data = feature_fcst[fcst_mask]

if len(fcst_data) == 0:
log.error(f'No forecast data generated for feature {feature}')
continue

forecast_data[feature]['values'] = fcst_data[feature].tolist()
forecast_data[feature]['timestamps'] = fcst_data.index.astype(np.int64) // 10**9
forecast_data[feature]['confidence_intervals']['5th'] = fcst_data[f'{feature}_5th'].tolist()
forecast_data[feature]['confidence_intervals']['95th'] = fcst_data[f'{feature}_95th'].tolist()

# Extract weather forecast data (only once)
if feature == constants.FEATURES_TO_FORECAST[0]:
weather_forecast['airtemp'] = fcst_data['airtemp'].tolist()
weather_forecast['precip'] = fcst_data['precip'].tolist()
weather_forecast['cloudcover'] = fcst_data['cloudcover'].tolist()
weather_forecast['snow'] = fcst_data['snow'].tolist()
weather_forecast['snowdepth'] = fcst_data['snowdepth'].tolist()
weather_forecast['timestamps'] = fcst_data.index.astype(np.int64) // 10**9

# Store complete forecast
complete_forecast = {
'forecast_data': forecast_data,
'weather_forecast': weather_forecast
}

log.info('Storing complete forecast to database')
logging.getLogger('boto3.dynamodb.table').setLevel(logging.DEBUG)
db_v2.push_forecast_entry(usgs_site, last_hist_origin, complete_forecast)

if is_onboarding:
db_v2.push_site_onboarding_log(usgs_site, f'\tfinished forecasting at {utils.get_current_local_time()}')
db_v2.update_site_status(usgs_site, db_v2.SiteStatus.ACTIVE)

return { 'statusCode': 200 }

def forecast_feature(data: pd.DataFrame, feature: str, usgs_site: str, is_onboarding: bool):
"""
Forecast a specific feature using NeuralProphet
This function remains largely the same as the original forecast_feature
"""
df = data.drop(columns=data.columns.difference(constants.FEATURE_COLS[feature]))
df = df.reset_index()
df = df.rename(columns={'timestamp': 'ds'})

# convert decimals to floats
df[constants.FEATURE_COLS[feature]] = df[constants.FEATURE_COLS[feature]].apply(pd.to_numeric, downcast='float')

df = df.rename(columns={feature: 'y'})
# todo - remove once neuralprophet issue is resolved
df.loc[0, 'snow'] = 0.01
df.loc[0, 'snowdepth'] = 0.01
log.info(f'dataset ready for inference:\n{df}')

# load model
model = s3.load_model(usgs_site, feature)

# prep future
future = model.make_future_dataframe(
df=df[df['y'].notnull()],
regressors_df=df[df['y'].isnull()].drop(columns=['y']),
periods=constants.FORECAST_HORIZON
)

# predict
# hide py.warnings (noisy pandas warnings during training)
logging.getLogger('py.warnings').setLevel(logging.ERROR)
pred = model.predict(df=future)
yhat = model.get_latest_forecast(pred)

yhat = yhat.set_index(yhat['ds'])
utils.convert_floats_to_decimals(yhat)
data[f'{feature}_5th'] = np.nan
data[f'{feature}_95th'] = np.nan
data[f'{feature}'] = data[f'{feature}'].combine_first(yhat['origin-0'])
data[f'{feature}_5th'] = data[f'{feature}_5th'].combine_first(yhat['origin-0 5.0%'])
data[f'{feature}_95th'] = data[f'{feature}_95th'].combine_first(yhat['origin-0 95.0%'])

return data
4 changes: 4 additions & 0 deletions backend/src/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
log = logging.getLogger(__name__)

import handlers.forecast as forecast
import handlers.forecast_v2 as forecast_v2
import handlers.train as train
import handlers.update as update
import handlers.access as access
Expand Down Expand Up @@ -58,6 +59,9 @@ def handle(handler, *args):
def handle_forecast(event, context):
return handle(forecast.handler, event, context)

def handle_forecast_v2(event, context):
return handle(forecast_v2.handler, event, context)

# this is run in fargate, and as such has slightly different parameters
def handle_train(usgs_site: str, is_onboarding: bool):
return handle(train.handler, usgs_site, is_onboarding)
Expand Down
Loading