fbmc-chronos2 / scripts /unify_features_checkpoint.py
Evgueni Poloukarov
feat: complete feature unification (2,408 features, 24 months)
8fd4a0e
raw
history blame
10.5 kB
"""Unified Features Generation - Checkpoint-Based Workflow
Combines JAO (1,737) + ENTSO-E (297) + Weather (376) features = 2,410 total features
Executes step-by-step with checkpoints for fast debugging.
Author: Claude
Date: 2025-11-11
"""
import sys
from pathlib import Path
import polars as pl
from datetime import datetime
# Paths
BASE_DIR = Path(__file__).parent.parent
RAW_DIR = BASE_DIR / 'data' / 'raw'
PROCESSED_DIR = BASE_DIR / 'data' / 'processed'
# Input files
JAO_FILE = PROCESSED_DIR / 'features_jao_24month.parquet'
ENTSOE_FILE = PROCESSED_DIR / 'features_entsoe_24month.parquet'
WEATHER_FILE = PROCESSED_DIR / 'features_weather_24month.parquet'
# Output files
UNIFIED_FILE = PROCESSED_DIR / 'features_unified_24month.parquet'
METADATA_FILE = PROCESSED_DIR / 'features_unified_metadata.csv'
print("="*80)
print("UNIFIED FEATURES GENERATION - CHECKPOINT WORKFLOW")
print("="*80)
print()
# ============================================================================
# CHECKPOINT 1: Load Input Files
# ============================================================================
print("[CHECKPOINT 1] Loading input files...")
print()
try:
jao_raw = pl.read_parquet(JAO_FILE)
print(f"[OK] JAO features loaded: {jao_raw.shape[0]:,} rows x {jao_raw.shape[1]} cols")
entsoe_raw = pl.read_parquet(ENTSOE_FILE)
print(f"[OK] ENTSO-E features loaded: {entsoe_raw.shape[0]:,} rows x {entsoe_raw.shape[1]} cols")
weather_raw = pl.read_parquet(WEATHER_FILE)
print(f"[OK] Weather features loaded: {weather_raw.shape[0]:,} rows x {weather_raw.shape[1]} cols")
print()
except Exception as e:
print(f"[ERROR] Failed to load input files: {e}")
sys.exit(1)
# ============================================================================
# CHECKPOINT 2: Standardize Timestamps
# ============================================================================
print("[CHECKPOINT 2] Standardizing timestamps...")
print()
try:
# JAO: Convert mtu to UTC timestamp (remove timezone, use microseconds)
jao_std = jao_raw.with_columns([
pl.col('mtu').dt.convert_time_zone('UTC').dt.replace_time_zone(None).dt.cast_time_unit('us').alias('timestamp')
]).drop('mtu')
print(f"[OK] JAO timestamps standardized")
# ENTSO-E: Remove timezone, ensure microsecond precision
entsoe_std = entsoe_raw.with_columns([
pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us')
])
print(f"[OK] ENTSO-E timestamps standardized")
# Weather: Remove timezone, ensure microsecond precision
weather_std = weather_raw.with_columns([
pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us')
])
print(f"[OK] Weather timestamps standardized")
print()
except Exception as e:
print(f"[ERROR] Timestamp standardization failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 3: Find Common Date Range
# ============================================================================
print("[CHECKPOINT 3] Finding common date range...")
print()
try:
jao_min, jao_max = jao_std['timestamp'].min(), jao_std['timestamp'].max()
entsoe_min, entsoe_max = entsoe_std['timestamp'].min(), entsoe_std['timestamp'].max()
weather_min, weather_max = weather_std['timestamp'].min(), weather_std['timestamp'].max()
print(f"JAO range: {jao_min} to {jao_max}")
print(f"ENTSO-E range: {entsoe_min} to {entsoe_max}")
print(f"Weather range: {weather_min} to {weather_max}")
print()
common_min = max(jao_min, entsoe_min, weather_min)
common_max = min(jao_max, entsoe_max, weather_max)
print(f"[OK] Common range: {common_min} to {common_max}")
print()
except Exception as e:
print(f"[ERROR] Date range calculation failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 4: Filter to Common Range
# ============================================================================
print("[CHECKPOINT 4] Filtering to common date range...")
print()
try:
jao_filtered = jao_std.filter(
(pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max)
).sort('timestamp')
print(f"[OK] JAO filtered: {jao_filtered.shape[0]:,} rows")
entsoe_filtered = entsoe_std.filter(
(pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max)
).sort('timestamp')
print(f"[OK] ENTSO-E filtered: {entsoe_filtered.shape[0]:,} rows")
weather_filtered = weather_std.filter(
(pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max)
).sort('timestamp')
print(f"[OK] Weather filtered: {weather_filtered.shape[0]:,} rows")
print()
except Exception as e:
print(f"[ERROR] Filtering failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 5: Merge Datasets
# ============================================================================
print("[CHECKPOINT 5] Merging datasets horizontally...")
print()
try:
# Start with JAO (has timestamp)
unified_df = jao_filtered
# Join ENTSO-E on timestamp
entsoe_to_join = entsoe_filtered.drop('timestamp') # Drop duplicate timestamp column
unified_df = unified_df.hstack(entsoe_to_join)
print(f"[OK] ENTSO-E merged: {unified_df.shape[1]} total columns")
# Join Weather on timestamp
weather_to_join = weather_filtered.drop('timestamp') # Drop duplicate timestamp column
unified_df = unified_df.hstack(weather_to_join)
print(f"[OK] Weather merged: {unified_df.shape[1]} total columns")
print()
print(f"Final unified shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns")
print()
except Exception as e:
print(f"[ERROR] Merge failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 6: Data Quality Check
# ============================================================================
print("[CHECKPOINT 6] Running data quality checks...")
print()
try:
# Check for nulls
null_counts = unified_df.null_count()
total_nulls = null_counts.sum_horizontal()[0]
total_cells = unified_df.shape[0] * unified_df.shape[1]
completeness = (1 - total_nulls / total_cells) * 100
print(f"Data completeness: {completeness:.2f}%")
print(f"Total null values: {total_nulls:,} / {total_cells:,}")
print()
# Check timestamp continuity
timestamps = unified_df['timestamp'].sort()
time_diffs = timestamps.diff().dt.total_hours()
gaps = time_diffs.filter((time_diffs.is_not_null()) & (time_diffs != 1))
print(f"Timestamp continuity check:")
print(f" - Total timestamps: {len(timestamps):,}")
print(f" - Gaps detected: {len(gaps)}")
print(f" - Continuous: {'YES' if len(gaps) == 0 else 'NO'}")
print()
except Exception as e:
print(f"[ERROR] Quality check failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 7: Save Unified Features
# ============================================================================
print("[CHECKPOINT 7] Saving unified features file...")
print()
try:
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
unified_df.write_parquet(UNIFIED_FILE)
file_size_mb = UNIFIED_FILE.stat().st_size / (1024 * 1024)
print(f"[OK] Saved to: {UNIFIED_FILE}")
print(f"[OK] File size: {file_size_mb:.1f} MB")
print()
except Exception as e:
print(f"[ERROR] Save failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# CHECKPOINT 8: Generate Feature Metadata
# ============================================================================
print("[CHECKPOINT 8] Generating feature metadata...")
print()
try:
# Create metadata catalog
feature_cols = [c for c in unified_df.columns if c != 'timestamp']
metadata_rows = []
for i, col in enumerate(feature_cols, 1):
# Determine category from column name
if col.startswith('border_'):
category = 'JAO_Border'
elif col.startswith('cnec_'):
category = 'JAO_CNEC'
elif '_lta_' in col:
category = 'LTA'
elif '_load_forecast_' in col:
category = 'Load_Forecast'
elif '_gen_outage_' in col or '_tx_outage_' in col:
category = 'Outages'
elif any(col.startswith(prefix) for prefix in ['AT_', 'BE_', 'CZ_', 'DE_', 'FR_', 'HR_', 'HU_', 'NL_', 'PL_', 'RO_', 'SI_', 'SK_']):
category = 'Weather'
else:
category = 'Other'
metadata_rows.append({
'feature_index': i,
'feature_name': col,
'category': category,
'null_count': unified_df[col].null_count(),
'dtype': str(unified_df[col].dtype)
})
metadata_df = pl.DataFrame(metadata_rows)
metadata_df.write_csv(METADATA_FILE)
print(f"[OK] Saved metadata: {METADATA_FILE}")
print(f"[OK] Total features: {len(feature_cols)}")
print()
# Category breakdown
category_counts = metadata_df.group_by('category').agg(pl.count().alias('count')).sort('count', descending=True)
print("Feature breakdown by category:")
for row in category_counts.iter_rows(named=True):
print(f" - {row['category']}: {row['count']}")
print()
except Exception as e:
print(f"[ERROR] Metadata generation failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# ============================================================================
# FINAL SUMMARY
# ============================================================================
print("="*80)
print("UNIFIED FEATURES GENERATION COMPLETE")
print("="*80)
print()
print(f"Output file: {UNIFIED_FILE}")
print(f"Shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns")
print(f"Date range: {unified_df['timestamp'].min()} to {unified_df['timestamp'].max()}")
print(f"Data completeness: {completeness:.2f}%")
print(f"File size: {file_size_mb:.1f} MB")
print()
print("[SUCCESS] All checkpoints passed!")
print()