AI-OMS-Analyze / scripts /anomaly.py
kawaiipeace's picture
update
74baf5f
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from typing import Tuple
try:
import tensorflow as tf
from tensorflow.keras import layers, models
TF_AVAILABLE = True
except ImportError:
TF_AVAILABLE = False
def parse_datetime_cols(df: pd.DataFrame) -> pd.DataFrame:
for c in ['OutageDateTime','FirstRestoDateTime','LastRestoDateTime']:
if c in df.columns:
df[c+'_dt'] = pd.to_datetime(df[c], format='%d-%m-%Y %H:%M:%S', errors='coerce')
return df
def feature_engineer(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df = parse_datetime_cols(df)
# Duration in minutes between outage and last restore
if 'OutageDateTime_dt' in df.columns and 'LastRestoDateTime_dt' in df.columns:
df['duration_min'] = (df['LastRestoDateTime_dt'] - df['OutageDateTime_dt']).dt.total_seconds() / 60.0
else:
df['duration_min'] = np.nan
# Load numeric
for col in ['Load(MW)','Capacity(kVA)','FirstStepDuration','LastStepDuration','AffectedCustomer']:
if col in df.columns:
df[col+'_num'] = pd.to_numeric(df[col], errors='coerce')
else:
df[col+'_num'] = np.nan
# time of day
if 'OutageDateTime_dt' in df.columns:
df['hour'] = df['OutageDateTime_dt'].dt.hour
else:
df['hour'] = np.nan
# device type one-hot small encoding: frequency
if 'OpDeviceType' in df.columns:
freq = df['OpDeviceType'].fillna('NA').value_counts()
df['device_freq'] = df['OpDeviceType'].map(lambda x: freq.get(x,0))
else:
df['device_freq'] = 0
# coordinates
if 'OpDeviceXYcoord' in df.columns:
def parse_xy(s):
try:
s = str(s).strip().strip('"')
x,y = s.split(',')
return float(x), float(y)
except Exception:
return (np.nan, np.nan)
xs, ys = zip(*df['OpDeviceXYcoord'].map(parse_xy))
df['x'] = xs
df['y'] = ys
else:
df['x'] = np.nan
df['y'] = np.nan
return df
def build_feature_matrix(df: pd.DataFrame) -> Tuple[np.ndarray, list]:
df_fe = feature_engineer(df)
features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq','x','y']
X = df_fe[features].copy()
# Fill na with median
X = X.fillna(X.median())
scaler = StandardScaler()
Xs = scaler.fit_transform(X)
return Xs, features, df_fe, scaler
def run_isolation_forest(X: np.ndarray, contamination: float = 0.05, random_state: int = 42):
iso = IsolationForest(contamination=contamination, random_state=random_state)
preds = iso.fit_predict(X)
# IsolationForest returns -1 for outliers
scores = iso.decision_function(X)
return preds, scores
def run_lof(X: np.ndarray, contamination: float = 0.05, n_neighbors: int = 20):
lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
preds = lof.fit_predict(X)
# negative_outlier_factor_ (the lower, more abnormal)
scores = lof.negative_outlier_factor_
return preds, scores
def run_autoencoder(X: np.ndarray, contamination: float = 0.05, latent_dim: int = 4, epochs: int = 50, batch_size: int = 32):
if not TF_AVAILABLE:
raise ImportError("TensorFlow not available. Install tensorflow to use autoencoder.")
input_dim = X.shape[1]
# Build autoencoder
encoder = models.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(latent_dim, activation='relu')
])
decoder = models.Sequential([
layers.Input(shape=(latent_dim,)),
layers.Dense(16, activation='relu'),
layers.Dense(input_dim, activation='linear')
])
autoencoder = models.Sequential([encoder, decoder])
autoencoder.compile(optimizer='adam', loss='mse')
# Train
autoencoder.fit(X, X, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1)
# Reconstruction error
reconstructed = autoencoder.predict(X, verbose=0)
mse = np.mean((X - reconstructed)**2, axis=1)
# Threshold based on contamination
threshold = np.percentile(mse, (1 - contamination) * 100)
preds = (mse > threshold).astype(int) * -1 # -1 for outliers
preds[preds == 0] = 1 # 1 for inliers
return preds, mse
def explain_anomalies(df_fe: pd.DataFrame, explain_features=None):
# explain_features: which numeric columns to compute z-score on
if explain_features is None:
explain_features = ['duration_min','Load(MW)_num','AffectedCustomer_num','hour','device_freq']
df_num = df_fe[explain_features].astype(float).fillna(df_fe[explain_features].median())
means = df_num.mean()
stds = df_num.std().replace(0, 1.0)
z = (df_num - means) / stds
# create explanation string for each row: top 3 absolute z-scores
explanations = []
for i, row in z.iterrows():
abs_row = row.abs()
top = abs_row.sort_values(ascending=False).head(3)
parts = []
for feat in top.index:
val = row[feat]
sign = 'สูง' if val > 0 else 'ต่ำ' if val < 0 else 'ปกติ'
parts.append(f"{feat} {sign} (z={val:.2f})")
explanations.append('; '.join(parts))
return z, explanations
def detect_anomalies(df: pd.DataFrame, contamination: float = 0.05, algorithm: str = 'iso+lof') -> pd.DataFrame:
Xs, features, df_fe, scaler = build_feature_matrix(df)
if algorithm == 'autoencoder':
preds, scores = run_autoencoder(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['auto_pred'] = preds
res['auto_score'] = scores
res['final_flag'] = res['auto_pred'] == -1
else:
preds_iso, scores_iso = run_isolation_forest(Xs, contamination=contamination)
preds_lof, scores_lof = run_lof(Xs, contamination=contamination)
res = df.copy().reset_index(drop=True)
res['iso_pred'] = preds_iso
res['iso_score'] = scores_iso
res['lof_pred'] = preds_lof
res['lof_score'] = scores_lof
# ensemble: flag if both mark as outlier (-1)
res['ensemble_flag'] = ((res['iso_pred'] == -1) & (res['lof_pred'] == -1))
# algorithm filter: if algorithm == 'iso' or 'lof' or 'iso+lof', compute final_flag
if algorithm == 'iso':
res['final_flag'] = res['iso_pred'] == -1
elif algorithm == 'lof':
res['final_flag'] = res['lof_pred'] == -1
elif algorithm == 'iso+lof':
res['final_flag'] = res['ensemble_flag']
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
# explainability (same for all)
z_df, explanations = explain_anomalies(df_fe)
# attach z-scores for explain features
for col in z_df.columns:
res[f'z_{col}'] = z_df[col].values
res['explanation'] = explanations
return res