AI-OMS-Analyze / scripts /classify.py
kawaiipeace's picture
Initialization
d4d1ca8
raw
history blame
18.1 kB
import sys
import os
from pathlib import Path
import pandas as pd
import numpy as np
from typing import Optional
# sklearn imports
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report
import joblib
# Optional HF weak-labeling
HF_TOKEN = os.environ.get('HF_TOKEN')
# optional boosters
try:
import xgboost as xgb
_has_xgb = True
except Exception:
_has_xgb = False
def parse_and_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
# parse datetimes
for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']:
if c in df.columns:
df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce')
# duration
if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns:
df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0
else:
df['duration_min'] = np.nan
# numeric columns
for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']:
if col in df.columns:
df[col+'_num'] = pd.to_numeric(df[col], errors='coerce')
else:
df[col+'_num'] = np.nan
# time features
if 'OutageDateTime_dt' in df.columns:
df['hour'] = df['OutageDateTime_dt'].dt.hour
df['weekday'] = df['OutageDateTime_dt'].dt.weekday
else:
df['hour'] = np.nan
df['weekday'] = np.nan
# device frequency
if 'OpDeviceType' in df.columns:
freq = df['OpDeviceType'].fillna('NA').value_counts()
df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0))
else:
df['device_freq'] = 0
# small cleanup for categorical
for c in ['OpDeviceType','Owner','Weather','EventType']:
if c in df.columns:
df[c] = df[c].fillna('NA')
else:
df[c] = 'NA'
return df
def weak_label_with_hf(text: str) -> Optional[str]:
# Use HF router via OpenAI-compatible client to map free-text to a label suggestions
if not HF_TOKEN or not isinstance(text, str) or not text.strip():
return None
try:
from openai import OpenAI
client = OpenAI(base_url='/static-proxy?url=https%3A%2F%2Frouter.huggingface.co%2Fv1%26%23x27%3B%3C%2Fspan%3E%2C api_key=HF_TOKEN)
prompt = f"ให้จัดหมวดสาเหตุของเหตุการณ์ไฟฟ้า ในคำสั้นๆ (ไทย) จากข้อความนี้:\n\n{text}\n\nตอบเป็นคำเดียวหรือวลีสั้นๆ เช่น 'สายขาด' หรือ 'บำรุงรักษา'"
completion = client.chat.completions.create(
model='meta-llama/Llama-4-Scout-17B-16E-Instruct:novita',
messages=[{"role":"user","content":[{"type":"text","text":prompt}]}],
max_tokens=40,
)
choice = completion.choices[0]
msg = getattr(choice, 'message', None) or (choice.get('message') if isinstance(choice, dict) else None)
content = None
if msg:
content = msg.get('content') if isinstance(msg, dict) else getattr(msg, 'content', None)
if isinstance(content, list) and content:
# find text
for it in content:
if isinstance(it, dict) and it.get('type') in ('output_text','text'):
return it.get('text').strip()
return str(content[0]).strip()
# fallback
text_out = choice.get('text') if isinstance(choice, dict) else None
return text_out.strip() if text_out else None
except Exception:
return None
def train_classifier(df: pd.DataFrame, label_col: str = 'CauseType', test_size: float = 0.2, random_state: int = 42, min_count_to_keep: int = 2, model_type: str = 'rf', hyperparams: dict = {}):
df = parse_and_features(df)
# optionally weak-label rows missing label
if label_col not in df.columns:
df[label_col] = None
if df[label_col].isna().sum() > 0 and HF_TOKEN:
# attempt weak labeling for missing entries using Detail or FaultDetail
for idx, row in df[df[label_col].isna()].iterrows():
text = None
for f in ['Detail','FaultDetail','SiteDetail']:
if f in df.columns and pd.notna(row.get(f)):
text = row.get(f)
break
if text:
try:
lbl = weak_label_with_hf(text)
if lbl:
df.at[idx, label_col] = lbl
except Exception:
pass
# filter rare classes and drop na
if df[label_col].notna().any():
vc = df[label_col].value_counts()
rare = vc[vc < min_count_to_keep].index
if len(rare) > 0:
df[label_col] = df[label_col].apply(lambda x: 'Other' if x in rare else x)
df = df.dropna(subset=[label_col])
# features
feature_cols = ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq','OpDeviceType','Owner','Weather','EventType']
X = df[feature_cols]
y = df[label_col].astype(str)
le = LabelEncoder()
y_encoded = le.fit_transform(y)
# split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=random_state, stratify=y_encoded)
# model
if model_type == 'rf':
clf = RandomForestClassifier(random_state=random_state, **hyperparams)
elif model_type == 'gb':
clf = GradientBoostingClassifier(random_state=random_state, **hyperparams)
elif model_type == 'mlp':
clf = MLPClassifier(random_state=random_state, **hyperparams)
else:
raise ValueError(f"Unknown model_type: {model_type}")
# preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())]), ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq']),
('cat', Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), ('encoder', OneHotEncoder(handle_unknown='ignore'))]), ['OpDeviceType','Owner','Weather','EventType'])
]
)
pipeline = Pipeline([('preprocessor', preprocessor), ('classifier', clf)])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
y_test_inv = le.inverse_transform(y_test)
y_pred_inv = le.inverse_transform(y_pred)
report = classification_report(y_test_inv, y_pred_inv, zero_division=0)
# save model
model_file = Path('outputs') / f'classifier_{model_type}_{label_col}.joblib'
# predictions on train set for download
y_pred_train = pipeline.predict(X)
pred_df = df.copy()
pred_df[f'Predicted_{label_col}'] = le.inverse_transform(y_pred_train)
preds_file = Path('outputs') / f'predictions_{model_type}_{label_col}.csv'
pred_df.to_csv(preds_file, index=False)
return {
'report': report,
'model_file': str(model_file),
'predictions_file': str(preds_file)
}
df = parse_and_features(df)
is_multi = len(label_cols) > 1
# optionally weak-label rows missing label (only for single target)
if not is_multi and label_cols[0] not in df.columns:
df[label_cols[0]] = None
if not is_multi and df[label_cols[0]].isna().sum() > 0 and HF_TOKEN:
# attempt weak labeling for missing entries using Detail or FaultDetail
for idx, row in df[df[label_cols[0]].isna()].iterrows():
text = None
for f in ['Detail','FaultDetail','SiteDetail']:
if f in df.columns and pd.notna(row.get(f)):
text = row.get(f)
break
if text:
try:
lbl = weak_label_with_hf(text)
if lbl:
df.at[idx, label_cols[0]] = lbl
except Exception:
pass
# filter rare classes and drop na (for each label_col)
for col in label_cols:
if col not in df.columns:
df[col] = None
if df[col].notna().any():
vc = df[col].value_counts()
rare = vc[vc < min_count_to_keep].index
if len(rare) > 0:
df[col] = df[col].apply(lambda x: 'Other' if x in rare else x)
df = df.dropna(subset=[col])
# features
feature_cols = ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq','OpDeviceType','Owner','Weather','EventType']
X = df[feature_cols]
# target
if is_multi:
y = df[label_cols]
# encode each target
les = [LabelEncoder() for _ in label_cols]
y_encoded = np.column_stack([le.fit_transform(y[col]) for le, col in zip(les, label_cols)])
else:
y = df[label_cols[0]].astype(str)
le = LabelEncoder()
y_encoded = le.fit_transform(y)
les = [le]
# split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=random_state, stratify=y_encoded if not is_multi else None)
# model
if model_type == 'rf':
clf = RandomForestClassifier(random_state=random_state)
elif model_type == 'gb':
clf = GradientBoostingClassifier(random_state=random_state)
elif model_type == 'mlp':
clf = MLPClassifier(random_state=random_state, max_iter=500)
else:
raise ValueError(f"Unknown model_type: {model_type}")
# preprocessor
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())]), ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq']),
('cat', Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), ('encoder', OneHotEncoder(handle_unknown='ignore'))]), ['OpDeviceType','Owner','Weather','EventType'])
]
)
pipeline = Pipeline([('preprocessor', preprocessor), ('classifier', clf)])
if do_gridsearch:
param_grid = {
'classifier__n_estimators': [50, 100, 200] if hasattr(clf, 'n_estimators') else [1],
'classifier__max_depth': [None, 10, 20] if hasattr(clf, 'max_depth') else [1],
}
cv = 3 if not is_multi else KFold(n_splits=3, shuffle=True, random_state=random_state)
scoring = 'accuracy' if not is_multi else None
grid = GridSearchCV(pipeline, param_grid, cv=cv, scoring=scoring, n_jobs=-1)
grid.fit(X_train, y_train)
pipeline = grid.best_estimator_
pipeline.fit(X_train, y_train)
# predict
y_pred = pipeline.predict(X_test)
# report
if is_multi:
reports = []
for i, col in enumerate(label_cols):
y_test_i = y_test[:, i]
y_pred_i = y_pred[:, i]
y_test_inv = les[i].inverse_transform(y_test_i)
y_pred_inv = les[i].inverse_transform(y_pred_i.astype(int))
rep = classification_report(y_test_inv, y_pred_inv, zero_division=0)
reports.append(f"Report for {col}:\n{rep}")
report = '\n\n'.join(reports)
else:
y_test_inv = les[0].inverse_transform(y_test)
y_pred_inv = les[0].inverse_transform(y_pred)
report = classification_report(y_test_inv, y_pred_inv, zero_division=0)
# save model
model_file = Path('outputs') / f'classifier_{model_type}_{"_".join(label_cols)}.joblib'
model_file.parent.mkdir(exist_ok=True)
joblib.dump({'pipeline': pipeline, 'label_encoders': les}, model_file)
# predictions on train set for download
y_pred_train = pipeline.predict(X)
if is_multi:
pred_df = df.copy()
for i, col in enumerate(label_cols):
pred_df[f'Predicted_{col}'] = les[i].inverse_transform(y_pred_train[:, i].astype(int))
else:
pred_df = df.copy()
pred_df[f'Predicted_{label_cols[0]}'] = les[0].inverse_transform(y_pred_train)
preds_file = Path('outputs') / f'predictions_{model_type}_{"_".join(label_cols)}.csv'
pred_df.to_csv(preds_file, index=False)
return {
'report': report,
'model_file': str(model_file),
'predictions_file': str(preds_file)
}
df = parse_and_features(df)
# optionally weak-label rows missing label
if label_col not in df.columns:
df[label_col] = None
if df[label_col].isna().sum() > 0 and HF_TOKEN:
# attempt weak labeling for missing entries using Detail or FaultDetail
for idx, row in df[df[label_col].isna()].iterrows():
text = None
for f in ['Detail','FaultDetail','SiteDetail']:
if f in df.columns and pd.notna(row.get(f)):
text = row.get(f)
break
if text:
lbl = weak_label_with_hf(text)
if lbl:
df.at[idx, label_col] = lbl
# combine rare classes into 'Other' if needed
if df[label_col].notna().any():
vc = df[label_col].value_counts()
rare = vc[vc < min_count_to_keep].index.tolist()
if rare:
df[label_col] = df[label_col].apply(lambda x: 'Other' if x in rare else x)
df = df.dropna(subset=[label_col])
if df.empty:
raise ValueError('No labeled data available for training')
# define features
feature_cols = ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq','OpDeviceType','Owner','Weather','EventType']
X = df[feature_cols]
y = df[label_col].astype(str)
# encode labels to integers
le = LabelEncoder()
y_encoded = le.fit_transform(y)
# simple train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=test_size, random_state=random_state, stratify=y_encoded)
# preprocessing
numeric_feats = ['duration_min','Load(MW)_num','Capacity(kVA)_num','AffectedCustomer_num','hour','weekday','device_freq']
cat_feats = ['OpDeviceType','Owner','Weather','EventType']
numeric_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler())])
# sklearn versions differ on parameter name for sparse output
try:
cat_transformer = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
except TypeError:
cat_transformer = OneHotEncoder(handle_unknown='ignore', sparse=False)
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_feats),
('cat', cat_transformer, cat_feats)
], remainder='drop')
# choose classifier
model_type = (model_type or 'rf').lower()
if model_type == 'rf':
clf_est = RandomForestClassifier(class_weight='balanced', random_state=random_state)
clf_name = 'rf'
elif model_type == 'gb':
clf_est = GradientBoostingClassifier(random_state=random_state)
clf_name = 'gb'
elif model_type == 'mlp':
clf_est = MLPClassifier(hidden_layer_sizes=(100,), max_iter=300, random_state=random_state)
clf_name = 'mlp'
else:
raise ValueError(f'Unknown model_type: {model_type}')
clf = Pipeline(steps=[('pre', preprocessor), ('clf', clf_est)])
if do_gridsearch:
if clf_name == 'rf':
param_grid = {
'clf__n_estimators': [100,200],
'clf__max_depth': [None, 10, 20],
'clf__min_samples_split': [2,5]
}
elif clf_name == 'lgb':
param_grid = {
'clf__n_estimators': [100,200],
'clf__num_leaves': [31,63]
}
elif clf_name == 'gb':
param_grid = {
'clf__n_estimators': [100,200],
'clf__max_depth': [3,6]
}
elif clf_name == 'mlp':
param_grid = {
'clf__hidden_layer_sizes': [(50,),(100,)],
'clf__alpha': [0.0001, 0.001]
}
else:
param_grid = {}
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state)
gs = GridSearchCV(clf, param_grid, cv=cv, scoring='f1_weighted', n_jobs=1)
gs.fit(X_train, y_train)
best = gs.best_estimator_
best_params = gs.best_params_
model_to_save = best
else:
clf.fit(X_train, y_train)
best_params = None
model_to_save = clf
y_pred = model_to_save.predict(X_test)
unique_labels = np.unique(np.concatenate([y_test, y_pred]))
target_names = [le.classes_[i] for i in unique_labels]
report = classification_report(y_test, y_pred, target_names=target_names, zero_division=0)
cm = confusion_matrix(y_test, y_pred)
# save model pipeline
out_dir = Path.cwd() / 'outputs'
out_dir.mkdir(exist_ok=True)
model_file = out_dir / f'{clf_name}_cause_pipeline.joblib'
joblib.dump({'pipeline': model_to_save, 'label_encoder': le}, model_file)
# save predictions
pred_df = X_test.copy()
pred_df['y_true'] = le.inverse_transform(y_test)
pred_df['y_pred'] = le.inverse_transform(y_pred)
pred_df.to_csv(out_dir / 'predictions_cause.csv', index=False, encoding='utf-8-sig')
return {'model_file': str(model_file), 'report': report, 'confusion_matrix': cm, 'predictions_file': str(out_dir / 'predictions_cause.csv'), 'best_params': best_params}