Spaces:
Sleeping
Sleeping
File size: 6,619 Bytes
27cb60a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
"""Recover October 2023 & 2024 LTA data with DST-safe date ranges.
The main collection failed for October due to DST transitions:
- October 2023: DST transition on Sunday, Oct 29
- October 2024: DST transition on Sunday, Oct 27
This script collects October in 2 chunks to avoid DST hour ambiguity:
- Chunk 1: Oct 1-26 (before DST weekend)
- Chunk 2: Oct 27-31 (after/including DST transition)
"""
import sys
from pathlib import Path
from datetime import datetime
import polars as pl
import time
from requests.exceptions import HTTPError
# Add src to path
sys.path.insert(0, str(Path.cwd() / 'src'))
from data_collection.collect_jao import JAOCollector
def collect_october_split(collector, year: int, month: int = 10):
"""Collect October LTA data in 2 chunks to avoid DST issues.
Args:
collector: JAOCollector instance
year: Year to collect (2023 or 2024)
month: Month (default 10 for October)
Returns:
Polars DataFrame with October LTA data, or None if failed
"""
import pandas as pd
print(f"\n{'=' * 70}")
print(f"COLLECTING OCTOBER {year} LTA (DST-Safe)")
print(f"{'=' * 70}")
all_data = []
# Define date chunks that avoid DST transition
chunks = [
(f"{year}-10-01", f"{year}-10-26"), # Before DST weekend
(f"{year}-10-27", f"{year}-10-31"), # After/including DST
]
for chunk_num, (start_date, end_date) in enumerate(chunks, 1):
print(f"\n Chunk {chunk_num}/2: {start_date} to {end_date}...", end=" ", flush=True)
# Retry logic with exponential backoff
max_retries = 5
base_delay = 60
success = False
for attempt in range(max_retries):
try:
# Rate limiting: 1 second between requests
time.sleep(1)
# Convert to pandas Timestamps with UTC timezone
pd_start = pd.Timestamp(start_date, tz='UTC')
pd_end = pd.Timestamp(end_date, tz='UTC')
# Query LTA for this chunk
df = collector.client.query_lta(pd_start, pd_end)
if df is not None and not df.empty:
# CRITICAL: Reset index to preserve datetime (mtu) as column
all_data.append(pl.from_pandas(df.reset_index()))
print(f"{len(df):,} records")
success = True
break
else:
print("No data")
success = True
break
except HTTPError as e:
if e.response.status_code == 429:
# Rate limited - exponential backoff
wait_time = base_delay * (2 ** attempt)
print(f"Rate limited (429), waiting {wait_time}s... ", end="", flush=True)
time.sleep(wait_time)
if attempt < max_retries - 1:
print(f"Retrying ({attempt + 2}/{max_retries})...", end=" ", flush=True)
else:
print(f"Failed after {max_retries} attempts")
else:
# Other HTTP error
print(f"Failed: {e}")
break
except Exception as e:
print(f"Failed: {e}")
break
# Combine chunks
if all_data:
combined = pl.concat(all_data, how='vertical')
print(f"\n Combined October {year}: {len(combined):,} records")
return combined
else:
print(f"\n [WARNING] No data collected for October {year}")
return None
def main():
"""Recover October 2023 and 2024 LTA data."""
print("\n" + "=" * 80)
print("OCTOBER LTA RECOVERY - DST-SAFE COLLECTION")
print("=" * 80)
print("Target: October 2023 & October 2024")
print("Strategy: Split around DST transition dates")
print("=" * 80)
# Initialize collector
collector = JAOCollector()
start_time = datetime.now()
# Collect October 2023
oct_2023 = collect_october_split(collector, 2023)
# Collect October 2024
oct_2024 = collect_october_split(collector, 2024)
# =========================================================================
# MERGE WITH EXISTING DATA
# =========================================================================
print("\n" + "=" * 80)
print("MERGING WITH EXISTING LTA DATA")
print("=" * 80)
existing_path = Path('data/raw/phase1_24month/jao_lta.parquet')
if not existing_path.exists():
print(f"[ERROR] Existing LTA file not found: {existing_path}")
print("Cannot merge. Exiting.")
return
# Read existing data
existing_df = pl.read_parquet(existing_path)
print(f"\nExisting data: {len(existing_df):,} records")
# Backup existing file
backup_path = existing_path.with_suffix('.parquet.backup')
existing_df.write_parquet(backup_path)
print(f"Backup created: {backup_path}")
# Combine all data
all_dfs = [existing_df]
recovered_count = 0
if oct_2023 is not None:
all_dfs.append(oct_2023)
recovered_count += len(oct_2023)
print(f"+ October 2023: {len(oct_2023):,} records")
if oct_2024 is not None:
all_dfs.append(oct_2024)
recovered_count += len(oct_2024)
print(f"+ October 2024: {len(oct_2024):,} records")
if recovered_count == 0:
print("\n[WARNING] No October data recovered")
return
# Merge and deduplicate
merged_df = pl.concat(all_dfs, how='vertical')
# Remove duplicates if any (unlikely but safe)
if 'datetime' in merged_df.columns or 'timestamp' in merged_df.columns:
time_col = 'datetime' if 'datetime' in merged_df.columns else 'timestamp'
initial_count = len(merged_df)
merged_df = merged_df.unique()
deduped_count = initial_count - len(merged_df)
if deduped_count > 0:
print(f"\nRemoved {deduped_count} duplicate records")
# Save merged data
merged_df.write_parquet(existing_path)
print("\n" + "=" * 80)
print("RECOVERY COMPLETE")
print("=" * 80)
print(f"Original records: {len(existing_df):,}")
print(f"Recovered records: {recovered_count:,}")
print(f"Total records: {len(merged_df):,}")
print(f"File: {existing_path}")
print(f"Size: {existing_path.stat().st_size / (1024**2):.2f} MB")
print(f"Backup: {backup_path}")
elapsed = datetime.now() - start_time
print(f"\nTotal time: {elapsed}")
print("=" * 80)
if __name__ == '__main__':
main()
|