File size: 11,106 Bytes
27cb60a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""Unify JAO datasets into single timeline.

Combines MaxBEX, CNEC/PTDF, LTA, and Net Positions data into a single
unified dataset with proper timestamp alignment.

Author: Claude
Date: 2025-11-06
"""
from pathlib import Path
from typing import Tuple
import polars as pl


def validate_timeline(df: pl.DataFrame, name: str) -> None:
    """Validate timeline is hourly with no gaps."""
    print(f"\nValidating {name} timeline...")

    # Check sorted
    if not df['mtu'].is_sorted():
        raise ValueError(f"{name}: Timeline not sorted")

    # Check for gaps (should be hourly)
    time_diffs = df['mtu'].diff().drop_nulls()
    most_common = time_diffs.mode()[0]

    # Most common should be 1 hour (allow for DST transitions)
    if most_common.total_seconds() != 3600:
        print(f"  [WARNING] Most common time diff: {most_common} (expected 1 hour)")

    print(f"  [OK] {name} timeline validated: {len(df)} records, sorted")


def add_timestamp_to_maxbex(
    maxbex: pl.DataFrame,
    master_timeline: pl.DataFrame
) -> pl.DataFrame:
    """Add mtu timestamp to MaxBEX via row alignment."""
    print("\nAdding timestamp to MaxBEX...")

    # Verify same length
    if len(maxbex) != len(master_timeline):
        raise ValueError(
            f"MaxBEX ({len(maxbex)}) and timeline ({len(master_timeline)}) "
            "have different lengths"
        )

    # Add mtu column via hstack
    maxbex_with_time = maxbex.hstack(master_timeline)

    print(f"  [OK] MaxBEX timestamp added: {len(maxbex_with_time)} records")
    return maxbex_with_time


def fill_lta_gaps(
    lta: pl.DataFrame,
    master_timeline: pl.DataFrame
) -> pl.DataFrame:
    """Fill LTA gaps using forward-fill strategy."""
    print("\nFilling LTA gaps...")

    # Report initial state
    initial_records = len(lta)
    expected_records = len(master_timeline)
    missing_hours = expected_records - initial_records

    print(f"  Initial LTA records: {initial_records:,}")
    print(f"  Expected records: {expected_records:,}")
    print(f"  Missing hours: {missing_hours:,} ({missing_hours/expected_records*100:.1f}%)")

    # Remove metadata columns
    lta_clean = lta.drop(['is_masked', 'masking_method'], strict=False)

    # Left join master timeline with LTA
    lta_complete = master_timeline.join(
        lta_clean,
        on='mtu',
        how='left'
    )

    # Get border columns
    border_cols = [c for c in lta_complete.columns if c.startswith('border_')]

    # Forward-fill gaps (LTA changes rarely)
    lta_complete = lta_complete.with_columns([
        pl.col(col).forward_fill().alias(col)
        for col in border_cols
    ])

    # Fill any remaining nulls at start with 0
    lta_complete = lta_complete.fill_null(0)

    # Verify no nulls remain
    null_count = lta_complete.null_count().sum_horizontal()[0]
    if null_count > 0:
        raise ValueError(f"LTA still has {null_count} nulls after filling")

    print(f"  [OK] LTA complete: {len(lta_complete)} records, 0 nulls")
    return lta_complete


def broadcast_cnec_to_hourly(
    cnec: pl.DataFrame,
    master_timeline: pl.DataFrame
) -> pl.DataFrame:
    """Broadcast daily CNEC snapshots to hourly timeline."""
    print("\nBroadcasting CNEC from daily to hourly...")

    # Report initial state
    unique_days = cnec['collection_date'].dt.date().n_unique()
    print(f"  CNEC unique days: {unique_days}")
    print(f"  Target hours: {len(master_timeline):,}")

    # Extract date from master timeline
    master_with_date = master_timeline.with_columns([
        pl.col('mtu').dt.date().alias('date')
    ])

    # Extract date from CNEC collection_date
    cnec_with_date = cnec.with_columns([
        pl.col('collection_date').dt.date().alias('date')
    ])

    # Drop collection_date, keep date for join
    cnec_with_date = cnec_with_date.drop('collection_date')

    # Join: Each day's CNEC snapshot broadcasts to 24-26 hours
    # Use left join to keep all hours even if no CNEC data
    cnec_hourly = master_with_date.join(
        cnec_with_date,
        on='date',
        how='left'
    )

    # Drop the date column used for join
    cnec_hourly = cnec_hourly.drop('date')

    print(f"  [OK] CNEC hourly: {len(cnec_hourly)} records")
    print(f"  [INFO] CNEC in long format - multiple rows per timestamp (one per CNEC)")

    return cnec_hourly


def join_datasets(
    master_timeline: pl.DataFrame,
    maxbex_with_time: pl.DataFrame,
    lta_complete: pl.DataFrame,
    netpos: pl.DataFrame,
    cnec_hourly: pl.DataFrame
) -> pl.DataFrame:
    """Join all datasets on mtu timestamp."""
    print("\nJoining all datasets...")

    # Start with MaxBEX (already has mtu via hstack)
    # MaxBEX is already aligned by row, so we can use it directly
    unified = maxbex_with_time.clone()
    print(f"  Starting with MaxBEX: {unified.shape}")

    # Join LTA
    unified = unified.join(
        lta_complete,
        on='mtu',
        how='left',
        suffix='_lta'
    )
    # Drop duplicate mtu if created
    if 'mtu_lta' in unified.columns:
        unified = unified.drop('mtu_lta')
    print(f"  After LTA: {unified.shape}")

    # Join NetPos
    netpos_clean = netpos.drop(['collection_date'], strict=False)
    unified = unified.join(
        netpos_clean,
        on='mtu',
        how='left',
        suffix='_netpos'
    )
    # Drop duplicate mtu if created
    if 'mtu_netpos' in unified.columns:
        unified = unified.drop('mtu_netpos')
    print(f"  After NetPos: {unified.shape}")

    # Note: CNEC is in long format, would explode the dataset
    # We'll handle CNEC separately in feature engineering
    print(f"  [INFO] CNEC not joined (long format - handle in feature engineering)")

    # Sort by timestamp (joins may have shuffled rows)
    print(f"\nSorting by timestamp...")
    unified = unified.sort('mtu')

    print(f"  [OK] Unified dataset: {unified.shape}")
    print(f"  [OK] Timeline sorted: {unified['mtu'].is_sorted()}")
    return unified


def unify_jao_data(
    maxbex_path: Path,
    cnec_path: Path,
    lta_path: Path,
    netpos_path: Path,
    output_dir: Path
) -> Tuple[pl.DataFrame, pl.DataFrame]:
    """Unify all JAO datasets into single timeline.

    Args:
        maxbex_path: Path to MaxBEX parquet file
        cnec_path: Path to CNEC/PTDF parquet file
        lta_path: Path to LTA parquet file
        netpos_path: Path to Net Positions parquet file
        output_dir: Directory to save unified data

    Returns:
        Tuple of (unified_wide, cnec_hourly) DataFrames
    """
    print("\n" + "=" * 80)
    print("JAO DATA UNIFICATION")
    print("=" * 80)

    # 1. Load datasets
    print("\nLoading datasets...")
    maxbex = pl.read_parquet(maxbex_path)
    cnec = pl.read_parquet(cnec_path)
    lta = pl.read_parquet(lta_path)
    netpos = pl.read_parquet(netpos_path)

    print(f"  MaxBEX: {maxbex.shape}")
    print(f"  CNEC: {cnec.shape}")
    print(f"  LTA: {lta.shape}")
    print(f"  NetPos (raw): {netpos.shape}")

    # 2. Deduplicate NetPos and align MaxBEX
    # MaxBEX has no timestamp - it's row-aligned with NetPos
    # Need to deduplicate both together to maintain alignment
    print("\nDeduplicating NetPos and aligning MaxBEX...")

    # Verify same length (must be row-aligned)
    if len(maxbex) != len(netpos):
        raise ValueError(
            f"MaxBEX ({len(maxbex)}) and NetPos ({len(netpos)}) "
            "have different lengths - cannot align"
        )

    # Add mtu column to MaxBEX via hstack (before deduplication)
    maxbex_with_time = maxbex.hstack(netpos.select(['mtu']))
    print(f"  MaxBEX + NetPos aligned: {maxbex_with_time.shape}")

    # Deduplicate MaxBEX based on mtu timestamp
    maxbex_before = len(maxbex_with_time)
    maxbex_with_time = maxbex_with_time.unique(subset=['mtu'], keep='first')
    maxbex_after = len(maxbex_with_time)
    maxbex_duplicates = maxbex_before - maxbex_after

    if maxbex_duplicates > 0:
        print(f"  MaxBEX deduplicated: {maxbex_with_time.shape} ({maxbex_duplicates:,} duplicates removed)")

    # Deduplicate NetPos
    netpos_before = len(netpos)
    netpos = netpos.unique(subset=['mtu'], keep='first')
    netpos_after = len(netpos)
    netpos_duplicates = netpos_before - netpos_after

    if netpos_duplicates > 0:
        print(f"  NetPos deduplicated: {netpos.shape} ({netpos_duplicates:,} duplicates removed)")

    # 3. Create master timeline from deduplicated NetPos
    print("\nCreating master timeline from Net Positions...")
    master_timeline = netpos.select(['mtu']).sort('mtu')
    validate_timeline(master_timeline, "Master")

    # 4. Fill LTA gaps
    lta_complete = fill_lta_gaps(lta, master_timeline)

    # 5. Broadcast CNEC to hourly
    cnec_hourly = broadcast_cnec_to_hourly(cnec, master_timeline)

    # 6. Join datasets (wide format: MaxBEX + LTA + NetPos)
    unified_wide = join_datasets(
        master_timeline,
        maxbex_with_time,
        lta_complete,
        netpos,
        cnec_hourly
    )

    # 7. Save outputs
    print("\nSaving unified data...")
    output_dir.mkdir(parents=True, exist_ok=True)

    unified_wide_path = output_dir / 'unified_jao_24month.parquet'
    cnec_hourly_path = output_dir / 'cnec_hourly_24month.parquet'

    unified_wide.write_parquet(unified_wide_path)
    cnec_hourly.write_parquet(cnec_hourly_path)

    print(f"  [OK] Unified wide: {unified_wide_path}")
    print(f"      Size: {unified_wide_path.stat().st_size / (1024**2):.2f} MB")
    print(f"  [OK] CNEC hourly: {cnec_hourly_path}")
    print(f"      Size: {cnec_hourly_path.stat().st_size / (1024**2):.2f} MB")

    # 8. Validation summary
    print("\n" + "=" * 80)
    print("UNIFICATION COMPLETE")
    print("=" * 80)
    print(f"Unified wide dataset: {unified_wide.shape}")
    print(f"  - mtu timestamp: 1 column")
    print(f"  - MaxBEX borders: 132 columns")
    print(f"  - LTA borders: 38 columns")
    print(f"  - Net Positions: 28 columns")
    print(f"  Total: {unified_wide.shape[1]} columns")
    print()
    print(f"CNEC hourly dataset: {cnec_hourly.shape}")
    print(f"  - Long format (one row per CNEC per hour)")
    print(f"  - Used in feature engineering phase")
    print("=" * 80)
    print()

    return unified_wide, cnec_hourly


def main():
    """Main execution."""
    # Paths
    base_dir = Path.cwd()
    data_dir = base_dir / 'data' / 'raw' / 'phase1_24month'
    output_dir = base_dir / 'data' / 'processed'

    maxbex_path = data_dir / 'jao_maxbex.parquet'
    cnec_path = data_dir / 'jao_cnec_ptdf.parquet'
    lta_path = data_dir / 'jao_lta.parquet'
    netpos_path = data_dir / 'jao_net_positions.parquet'

    # Verify files exist
    for path in [maxbex_path, cnec_path, lta_path, netpos_path]:
        if not path.exists():
            raise FileNotFoundError(f"Required file not found: {path}")

    # Unify
    unified_wide, cnec_hourly = unify_jao_data(
        maxbex_path,
        cnec_path,
        lta_path,
        netpos_path,
        output_dir
    )

    print("SUCCESS: JAO data unified and saved to data/processed/")


if __name__ == '__main__':
    main()