File size: 6,619 Bytes
27cb60a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Recover October 2023 & 2024 LTA data with DST-safe date ranges.

The main collection failed for October due to DST transitions:
- October 2023: DST transition on Sunday, Oct 29
- October 2024: DST transition on Sunday, Oct 27

This script collects October in 2 chunks to avoid DST hour ambiguity:
- Chunk 1: Oct 1-26 (before DST weekend)
- Chunk 2: Oct 27-31 (after/including DST transition)
"""
import sys
from pathlib import Path
from datetime import datetime
import polars as pl
import time
from requests.exceptions import HTTPError

# Add src to path
sys.path.insert(0, str(Path.cwd() / 'src'))

from data_collection.collect_jao import JAOCollector

def collect_october_split(collector, year: int, month: int = 10):
    """Collect October LTA data in 2 chunks to avoid DST issues.

    Args:
        collector: JAOCollector instance
        year: Year to collect (2023 or 2024)
        month: Month (default 10 for October)

    Returns:
        Polars DataFrame with October LTA data, or None if failed
    """
    import pandas as pd

    print(f"\n{'=' * 70}")
    print(f"COLLECTING OCTOBER {year} LTA (DST-Safe)")
    print(f"{'=' * 70}")

    all_data = []

    # Define date chunks that avoid DST transition
    chunks = [
        (f"{year}-10-01", f"{year}-10-26"),  # Before DST weekend
        (f"{year}-10-27", f"{year}-10-31"),  # After/including DST
    ]

    for chunk_num, (start_date, end_date) in enumerate(chunks, 1):
        print(f"\n  Chunk {chunk_num}/2: {start_date} to {end_date}...", end=" ", flush=True)

        # Retry logic with exponential backoff
        max_retries = 5
        base_delay = 60
        success = False

        for attempt in range(max_retries):
            try:
                # Rate limiting: 1 second between requests
                time.sleep(1)

                # Convert to pandas Timestamps with UTC timezone
                pd_start = pd.Timestamp(start_date, tz='UTC')
                pd_end = pd.Timestamp(end_date, tz='UTC')

                # Query LTA for this chunk
                df = collector.client.query_lta(pd_start, pd_end)

                if df is not None and not df.empty:
                    # CRITICAL: Reset index to preserve datetime (mtu) as column
                    all_data.append(pl.from_pandas(df.reset_index()))
                    print(f"{len(df):,} records")
                    success = True
                    break
                else:
                    print("No data")
                    success = True
                    break

            except HTTPError as e:
                if e.response.status_code == 429:
                    # Rate limited - exponential backoff
                    wait_time = base_delay * (2 ** attempt)
                    print(f"Rate limited (429), waiting {wait_time}s... ", end="", flush=True)
                    time.sleep(wait_time)

                    if attempt < max_retries - 1:
                        print(f"Retrying ({attempt + 2}/{max_retries})...", end=" ", flush=True)
                    else:
                        print(f"Failed after {max_retries} attempts")
                else:
                    # Other HTTP error
                    print(f"Failed: {e}")
                    break

            except Exception as e:
                print(f"Failed: {e}")
                break

    # Combine chunks
    if all_data:
        combined = pl.concat(all_data, how='vertical')
        print(f"\n  Combined October {year}: {len(combined):,} records")
        return combined
    else:
        print(f"\n  [WARNING] No data collected for October {year}")
        return None

def main():
    """Recover October 2023 and 2024 LTA data."""

    print("\n" + "=" * 80)
    print("OCTOBER LTA RECOVERY - DST-SAFE COLLECTION")
    print("=" * 80)
    print("Target: October 2023 & October 2024")
    print("Strategy: Split around DST transition dates")
    print("=" * 80)

    # Initialize collector
    collector = JAOCollector()

    start_time = datetime.now()

    # Collect October 2023
    oct_2023 = collect_october_split(collector, 2023)

    # Collect October 2024
    oct_2024 = collect_october_split(collector, 2024)

    # =========================================================================
    # MERGE WITH EXISTING DATA
    # =========================================================================
    print("\n" + "=" * 80)
    print("MERGING WITH EXISTING LTA DATA")
    print("=" * 80)

    existing_path = Path('data/raw/phase1_24month/jao_lta.parquet')

    if not existing_path.exists():
        print(f"[ERROR] Existing LTA file not found: {existing_path}")
        print("Cannot merge. Exiting.")
        return

    # Read existing data
    existing_df = pl.read_parquet(existing_path)
    print(f"\nExisting data: {len(existing_df):,} records")

    # Backup existing file
    backup_path = existing_path.with_suffix('.parquet.backup')
    existing_df.write_parquet(backup_path)
    print(f"Backup created: {backup_path}")

    # Combine all data
    all_dfs = [existing_df]
    recovered_count = 0

    if oct_2023 is not None:
        all_dfs.append(oct_2023)
        recovered_count += len(oct_2023)
        print(f"+ October 2023: {len(oct_2023):,} records")

    if oct_2024 is not None:
        all_dfs.append(oct_2024)
        recovered_count += len(oct_2024)
        print(f"+ October 2024: {len(oct_2024):,} records")

    if recovered_count == 0:
        print("\n[WARNING] No October data recovered")
        return

    # Merge and deduplicate
    merged_df = pl.concat(all_dfs, how='vertical')

    # Remove duplicates if any (unlikely but safe)
    if 'datetime' in merged_df.columns or 'timestamp' in merged_df.columns:
        time_col = 'datetime' if 'datetime' in merged_df.columns else 'timestamp'
        initial_count = len(merged_df)
        merged_df = merged_df.unique()
        deduped_count = initial_count - len(merged_df)
        if deduped_count > 0:
            print(f"\nRemoved {deduped_count} duplicate records")

    # Save merged data
    merged_df.write_parquet(existing_path)

    print("\n" + "=" * 80)
    print("RECOVERY COMPLETE")
    print("=" * 80)
    print(f"Original records: {len(existing_df):,}")
    print(f"Recovered records: {recovered_count:,}")
    print(f"Total records: {len(merged_df):,}")
    print(f"File: {existing_path}")
    print(f"Size: {existing_path.stat().st_size / (1024**2):.2f} MB")
    print(f"Backup: {backup_path}")

    elapsed = datetime.now() - start_time
    print(f"\nTotal time: {elapsed}")
    print("=" * 80)

if __name__ == '__main__':
    main()