EXTREMUM-demo/base/methods.py
2026-02-03 11:52:35 +01:00

1534 lines
52 KiB
Python

import pandas as pd
import pickle, os
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from pandas.api.types import is_numeric_dtype
from sklearn.metrics import classification_report
import plotly.express as px
from django.conf import settings
import joblib
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import dice_ml
from dict_and_html import *
import plotly.graph_objects as go
import math
from imblearn.over_sampling import SMOTE
from scipy.stats import median_abs_deviation
from numpy.fft import *
from sklearn.preprocessing import MinMaxScaler
from plotly.subplots import make_subplots
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from .glacier.src.gc_latentcf_search_1dcnn_function import gc_latentcf_search_1dcnn
from .glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
from pandas.api.types import is_numeric_dtype, is_categorical_dtype
import re
import json
import shutil
import traceback
PIPELINE_PATH = os.path.join(settings.BASE_DIR, "base/pipelines/")
def is_categorical_like(column):
"""Determine if a column is categorical-like (unique values much smaller than the number of rows)."""
return column.nunique() < 0.1 * len(column)
def add_jitter(column):
"""Add jitter to a numeric-like column for better visualization."""
return column + np.random.uniform(-0.1, 0.1, size=column.shape)
def classify_feature(series):
"""Classify a feature into binary, categorical, or continuous."""
if isinstance(series, pd.DataFrame):
if series.shape[1] == 1:
series = series.iloc[:, 0]
else:
raise ValueError("Expected a Series or single-column DataFrame")
unique_vals = series.dropna().unique()
if len(unique_vals) <= 2 and set(unique_vals).issubset({0, 1}):
return 'binary'
if is_categorical_dtype(series) or (series.dtype == object) or (len(unique_vals) < 15):
return 'categorical'
if is_numeric_dtype(series):
return 'continuous'
return 'other'
def create_plot(df, feature1, feature2, label=None):
"""Create a plot based on the types of features provided."""
# Drop missing values
features = [feature1, feature2]
if label:
features.append(label)
features = list(dict.fromkeys(features)) # Removes duplicates while preserving order
df = df[features].dropna()
if isinstance(feature2, list):
feature2 = feature2[0]
if isinstance(feature1, list):
feature1 = feature1[0]
print(type(feature2), feature2, df[feature2])
type1 = classify_feature(df[feature1])
type2 = classify_feature(df[feature2])
# Define plotting logic
if type1 == 'continuous' and type2 == 'continuous':
fig = px.scatter(
df, x=feature1, y=feature2, color=label if label else None,
trendline='ols', labels={feature1: feature1, feature2: feature2}
)
elif (type1 in ['categorical', 'binary']) and (type2 in ['continuous']):
fig = px.box(
df, x=feature1, y=feature2, color=label if label else None,
points="all", labels={feature1: feature1, feature2: feature2}
)
elif (type2 in ['categorical', 'binary']) and (type1 in ['continuous']):
fig = px.box(
df, x=feature2, y=feature1, color=label if label else None,
points="all", labels={feature1: feature1, feature2: feature2}
)
elif (type1 in ['categorical', 'binary']) and (type2 in ['categorical', 'binary']):
grouped = df.groupby([feature1, feature2]).size().reset_index(name='Count')
fig = px.density_heatmap(
grouped, x=feature1, y=feature2, z='Count', color_continuous_scale="Blues",
labels={feature1: feature1, feature2: feature2, 'Count': 'Count'}
)
else:
# fallback: simple scatter with jitter
df[feature1] = df[feature1] + np.random.uniform(-0.2, 0.2, size=len(df))
df[feature2] = df[feature2] + np.random.uniform(-0.2, 0.2, size=len(df))
fig = px.scatter(
df, x=feature1, y=feature2, color=label if label else None,
labels={feature1: feature1, feature2: feature2}
)
return fig
def stats(
dataset_path,
dataset_type,
pos=None,
neg=None,
feature1=None,
feature2=None,
label=None,
name=None,
):
if dataset_type == "tabular":
df = pd.read_csv(dataset_path)
fig = create_plot(df, feature1, feature2, label)
fig.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
# timeseries
df = pd.read_csv(dataset_path)
# samples subplots
# Create subplots
# column numbers - target_column
# TODO: case for when the dataset has
# id column
if name:
if name == "two-lead-ecg":
negative_label = "Signal 0"
positive_label = "Signal 1"
elif name == "gun-point":
negative_label = "Gun"
positive_label = "No gun"
elif name == "italy-power-demand":
negative_label = "October-March"
positive_label = "April-September"
elif name == "ecg-five-days":
negative_label = "12/11/1990"
positive_label = "17/11/1990"
else:
negative_label = "Negative label"
positive_label = "Positive label"
# hard coded need to be dynamic based on
# dataset
negative_label_value = neg
positive_label_value = pos
num_timesteps = df.shape[1] - 1
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
negative_label,
negative_label,
positive_label,
positive_label,
),
)
# suppose univariative
# TODO: multivariative
target_labels = list(df.iloc[:, -1].unique())
positive = int(target_labels[1])
negative = int(target_labels[0])
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[negative, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=1,
)
# Add normal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[negative, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=2,
)
# Add abnormal ECG trace 1
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[positive, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=1,
)
# Add abnormal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[positive, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=2,
)
# Update layout
fig.update_layout(
xaxis_title="Timesteps",
yaxis_title="ECG Value",
showlegend=False,
autosize=True,
)
# confidence plot
df = df.iloc[:, :-1]
df_grouped = df.agg(["mean", "std", "count"]).transpose()
df_grouped["ci"] = 40 * df_grouped["std"] / np.sqrt(df_grouped["count"])
df_grouped["ci_lower"] = df_grouped["mean"] - df_grouped["ci"]
df_grouped["ci_upper"] = df_grouped["mean"] + df_grouped["ci"]
fig1 = go.Figure(
[
go.Scatter(
name="Avg",
x=df_grouped.index,
y=round(df_grouped["mean"], 2),
mode="lines",
line=dict(color="rgb(31, 119, 180)"),
),
go.Scatter(
name="95% CI Upper",
x=df_grouped.index,
y=round(df_grouped["ci_upper"], 2),
mode="lines",
marker=dict(color="#444"),
line=dict(width=0),
showlegend=False,
),
go.Scatter(
name="95% CI Lower",
x=df_grouped.index,
y=round(df_grouped["ci_lower"], 2),
marker=dict(color="#444"),
line=dict(width=0),
mode="lines",
fillcolor="rgba(68, 68, 68, 0.3)",
fill="tonexty",
showlegend=False,
),
]
)
fig1.update_layout(
title=f"Confidence plot for {name}",
xaxis_title="Timestep",
yaxis_title="Avg value",
hovermode="x",
)
fig1.update_yaxes(rangemode="tozero")
return fig.to_html(), fig1.to_html()
# fig = px.line(df.iloc[int(feature1)])
return fig.to_html()
def compare_values(val1, val2):
if isinstance(val1, float) and isinstance(val2, float):
return not math.isclose(float(val1), float(val2))
else:
return val1 != val2
def preprocess(data, value_list, name, dataset_type, path=None, class_label=None):
if dataset_type == "tabular":
# Check and replace both string "None" and NoneType with np.nan
if ((data == "None").any().any()) or (data.isna().any().any()):
data.replace("None", np.nan, inplace=True) # Replace string "None"
data.replace([None], np.nan, inplace=True) # Replace Python NoneType
print('"None" values replaced with NaN')
else:
print('No "None" values found in the dataset')
if "id" in data.columns:
ids = data["id"]
data = data.drop(["id"], axis=1)
total_nan = data.isna().sum().sum()
if "imp" in value_list:
data = imputations(data, class_label, path)
imputed_data = data
if "onehot" in value_list:
data = onehot(data, path, class_label)
data = convert_to_category(data)
if "std" in value_list:
data = scaling(data, class_label, path)
if "id" in data.columns:
data = pd.concat([ids.to_frame(), data], axis=1, ignore_index=False)
if total_nan > 0:
os.remove(name)
imputed_data = pd.concat(
[ids.to_frame(), imputed_data], axis=1, ignore_index=False
)
imputed_data.to_csv(name, index=False)
elif dataset_type == "timeseries":
# timeseries
# save last columns values
data_class_col = data.iloc[:, -1]
# drop last column that contains class_labels
data = data.iloc[:, :-1]
if "imp" in value_list:
data = imputations_ts(data, path)
if "denoise" in value_list:
data = data.apply(denoise, args=(path,), axis=0)
if "std" in value_list:
data = scaling_ts(data, path)
data = pd.concat([data, data_class_col], axis=1)
# os.remove(name)
# data.to_csv(name, index=False)
return data
###--------------------------###
### TIMESERIES PREPROCESSING ###
def scaling_ts(data, path):
# Normalize the data using Min-Max scaling
scaler = MinMaxScaler()
data[data.columns] = scaler.fit_transform(data)
pickle.dump(scaler, open(path + "/min_max_scaler.sav", "wb"))
return data
def denoise(series, path):
# Apply FFT
fft_vals = fft(series)
fft_freqs = np.fft.fftfreq(len(fft_vals))
# Filter frequencies
fft_vals[np.abs(fft_freqs) > 0.1] = 0
# Inverse FFT to reconstruct the signal
denoised_series = ifft(fft_vals).real
return pd.Series(denoised_series, index=series.index)
def outlier_detection(series, path):
median = series.median()
mad = median_abs_deviation(series)
return np.abs(series - median) / mad > 3
def imputations_ts(data, path):
data[data.columns] = data[data.columns].fillna(data.mean())
return data
### TIMESERIES PREPROCESSING ###
###--------------------------###
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
def onehot(data, path=None, class_label=None, max_categories=10, handle_unknown='ignore'):
"""
One-hot encode categorical columns in a DataFrame while preserving the class label.
Parameters:
- data: pandas DataFrame
- path: str, path to save the encoded data (optional)
- class_label: str, name of the target variable column (optional)
- max_categories: int, maximum number of unique values for a column to be considered categorical
- handle_unknown: str, 'error' or 'ignore', how to handle unknown categories in future data
Returns:
- pandas DataFrame with one-hot encoded categories
"""
if not isinstance(data, pd.DataFrame):
raise ValueError("Input must be a pandas DataFrame")
if not isinstance(max_categories, int) or max_categories <= 0:
raise ValueError("max_categories must be a positive integer")
# Identify categorical columns (including numeric columns with limited unique values)
categorical_columns = data.select_dtypes(include=["object", "category"]).columns.tolist()
numeric_categorical_columns = [
col for col in data.select_dtypes(include=["int64", "float64"]).columns
if data[col].nunique() <= max_categories and col != class_label
]
all_categorical_columns = categorical_columns + numeric_categorical_columns
# Exclude the class label from categorical columns
if class_label and class_label in all_categorical_columns:
all_categorical_columns.remove(class_label)
if not all_categorical_columns:
print("No categorical columns identified for one-hot encoding.")
return data # Return the original DataFrame if no categorical columns are found
# Initialize OneHotEncoder
encoder = OneHotEncoder(handle_unknown=handle_unknown, sparse_output=False)
# Fit and transform the categorical columns
encoded_array = encoder.fit_transform(data[all_categorical_columns])
# Create DataFrame with encoded values
feature_names = encoder.get_feature_names_out(all_categorical_columns)
df_encoded = pd.DataFrame(encoded_array, columns=feature_names, index=data.index)
# Combine encoded DataFrame with non-categorical columns
non_categorical_columns = [col for col in data.columns if col not in all_categorical_columns]
df_final = pd.concat([data[non_categorical_columns], df_encoded], axis=1)
pickle.dump(encoder, open(os.path.join(path, "one_hot.sav"), "wb"))
return df_final
# Example usage:
# df_encoded = onehot(df, 'path/to/save', class_label='target', max_categories=15)
def imputations(data, class_label, path):
imp = SimpleImputer(missing_values=np.nan, strategy="mean")
y = data[class_label]
data = data.drop([class_label], axis=1)
numeric_cols = data.select_dtypes(include=["float64", "int64"]).columns
print("Numeric columns ", numeric_cols)
data[numeric_cols] = imp.fit_transform(data[numeric_cols])
# Convert back to DataFrame and ensure correct data types
data[numeric_cols] = data[numeric_cols].astype(float)
pickle.dump(imp, open(os.path.join(path, "imp.sav"), "wb"))
data = pd.concat([y.to_frame(), data], axis=1)
return data
def scaling(data, class_label, path):
scaler = StandardScaler()
y = data[class_label]
data = data.drop([class_label], axis=1)
# transform data
cols = data.select_dtypes(np.number).columns
# keep non-binary columns
nonbinary_columns = [
col for col in cols if data[col].nunique() > 10
]
data[nonbinary_columns] = scaler.fit_transform(data[nonbinary_columns])
pickle.dump(scaler, open(os.path.join(path, "standard_scaler.sav"), "wb"))
# Ensure binary columns are of type int
binary_columns = [col for col in cols if col not in nonbinary_columns]
data[binary_columns] = data[binary_columns].astype(int)
data = pd.concat([y.to_frame(), data], axis=1)
return data
# Add this function to convert object columns to category
def convert_to_category(data):
for col in data.select_dtypes(include=['object']):
data[col] = data[col].astype('category')
return data
### TABULAR PREPROCESSING ###
###--------------------------###
def decode_cf(df, row, class_label, path, preprocessing_list):
"""
Decode counterfactual row to its original feature space, reversing preprocessing steps.
:param df: Original DataFrame before preprocessing.
:param row: Counterfactual row (single instance) to decode.
:param class_label: The name of the class label column.
:param path: Path where preprocessing objects are saved.
:param preprocessing_list: List of preprocessing steps applied (e.g., "onehot", "std").
:return: Decoded counterfactual row.
"""
cf_row = row.copy()
# Select all numerical columns
df_numerical = df.select_dtypes(include=["number"]).columns.tolist()
# Filter out numerical columns that are categorical (based on unique values)
nonbinary_numeric_columns = [
col for col in df_numerical if df[col].nunique() > 10
]
# Identify categorical columns (excluding the class label)
df_categorical = (
df.drop([class_label], axis=1)
.select_dtypes(include=["object"])
.columns.tolist()
)
# Decode one-hot encoded columns
if "onehot" in preprocessing_list:
ohe = joblib.load(path + "/one_hot.sav")
if ohe.get_feature_names_out().size > 0:
# Decode one-hot encoded features
one_hot_decoded = ohe.inverse_transform(cf_row[ohe.get_feature_names_out()])
one_hot_decoded_df = pd.DataFrame(
one_hot_decoded,
columns=ohe.feature_names_in_, # Original categorical column names
index=cf_row.index # Align index with cf_row
)
# Drop the one-hot encoded columns
cf_row = cf_row.drop(ohe.get_feature_names_out(), axis=1)
# Concatenate the decoded categorical columns
cf_row = pd.concat([cf_row, one_hot_decoded_df], axis=1)
# Decode standardized numeric columns
if "std" in preprocessing_list:
scaler = joblib.load(path + "/standard_scaler.sav")
cf_row[nonbinary_numeric_columns] = scaler.inverse_transform(
cf_row[nonbinary_numeric_columns]
)
# Decode the class label
le = joblib.load(path + "/label_encoder.sav")
cf_row[class_label] = le.inverse_transform(cf_row[[class_label]].values.ravel())
return cf_row
def training(
data,
model,
test_size,
label,
dataset_type,
df_name,
model_path=None,
autoencoder="No",
experiment_arguments=None,
):
X = data
if dataset_type == "tabular":
if "id" in data.columns:
X = data.drop("id", axis=1)
y = X[label]
X = X.drop(label, axis=1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, stratify=y.values, random_state=42
)
if df_name == "stroke":
# needs Oversampling
## TODO if class_label is multi class SMOTE needs to have sampling strategy Dict
## TODO check if df needs oversampling
oversample = SMOTE(random_state=42)
X_sm, y_sm = oversample.fit_resample(
X_train,
y_train,
)
X_train, X_test, y_train, y_test = train_test_split(
X_sm, y_sm, test_size=test_size, stratify=y_sm.values, random_state=42
)
if "lr" == model:
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(
random_state=0,
solver="lbfgs", # Efficient for small to medium datasets
penalty="l2", # Default L2 regularization
C=1.0 # Regularization strength (smaller values increase regularization)
).fit(X_train, y_train)
y_pred = clf.predict(X_test)
filename = "lr.sav"
importance = clf.coef_[0]
model = clf
if "xgb" == model:
from xgboost import XGBClassifier
xgb = XGBClassifier(
n_estimators=300, # Increased for better performance with larger datasets
learning_rate=0.05, # Slower learning rate for better convergence
max_depth=6, # Increased to capture more complex patterns
min_child_weight=2, # Slightly higher to prevent overfitting
subsample=0.8, # Keep as is to reduce overfitting
colsample_bytree=0.8, # Keep as is for feature sampling
gamma=0.1, # Minimum loss reduction for splits (encourages simpler trees)
reg_lambda=1.0, # Default L2 regularization
reg_alpha=0.1, # Slight L1 regularization for sparsity
objective="binary:logistic", # Binary classification
use_label_encoder=False, # Avoid unnecessary warnings
eval_metric="auc", # Use AUC for binary classification
).fit(X_train, y_train)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == model:
from sklearn.tree import DecisionTreeClassifier
dt = DecisionTreeClassifier(
max_depth=10, # Restrict depth to prevent overfitting
min_samples_split=4, # Minimum samples required to split an internal node
min_samples_leaf=2, # Minimum samples per leaf node
random_state=42,
).fit(X_train, y_train)
y_pred = dt.predict(X_test)
filename = "dt.sav"
importance = dt.feature_importances_
model = dt
if "svm" == model:
from sklearn.svm import LinearSVC
svc = LinearSVC(C=1.0, random_state=42)
svc.fit(X_train, y_train)
# Make predictions
y_pred = svc.predict(X_test)
filename = "svm.sav"
# Feature importance
importance = svc.coef_[0]
feature_importance = {f"Feature_{i}": coef for i, coef in enumerate(importance)}
if "rf" == model:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier(
n_estimators=200, # Increased number of trees for better performance
max_depth=10, # Restrict depth to prevent overfitting
min_samples_split=4, # Minimum samples required to split an internal node
min_samples_leaf=2, # Minimum samples per leaf node
random_state=42,
max_features="sqrt", # Use sqrt(number of features) at each split
).fit(X_train, y_train)
y_pred = rf.predict(X_test)
filename = "rf.sav"
importance = rf.feature_importances_
model = rf
class_report = classification_report(y_test, y_pred.flatten(), output_dict=True)
class_report = pd.DataFrame(class_report)
feature_importance = px.bar(x=importance, y=X_train.columns)
if model_path:
pickle.dump(model, open(model_path + f"/{filename}", "wb"))
feature_importance_dict = dict(zip(X_train.columns, importance))
return feature_importance, class_report, feature_importance_dict
else:
# TODO: add 1dcnn train
if model == "glacier":
# Split the lr-list string and convert each value to float
# experiment_arguments[8] = experiment_arguments[8].rstrip(';')
# lr_list = [float(x) for x in experiment_arguments[8].split()]
y_test, y_train, y_pred = gc_latentcf_search_1dcnn(
data,
int(experiment_arguments[1]),
int(experiment_arguments[2]),
model_path + f"/{experiment_arguments[3]}",
model_path,
autoencoder,
)
class_report = classification_report(y_test, y_pred, output_dict=True)
# Convert the classification report to a pandas DataFrame
class_report = pd.DataFrame(class_report).transpose()
elif model == "wildboar_knn" or model == "wildboar_rsf":
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=1
)
# n_samples, n_timestep = X_train.shape
# y_labels, counts = np.unique(y_train, return_counts=True)
# print(
# f"""
# The dataset contains {n_samples} samples with {n_timestep} time steps each.
# Of the samples, {counts[0]} is labeled as {y_labels[0]} and {counts[1]} labeled
# as {y_labels[1]}. Here, we plot the time series.
# """
# )
from wildboar.utils.plot import plot_time_domain
if model == "wildboar_knn":
from wildboar.distance import KNeighborsClassifier
from wildboar.explain.counterfactual import KNeighborsCounterfactual
filename = "wildboar_knn.sav"
classifier = KNeighborsClassifier(
n_neighbors=5, metric="dtw", metric_params={"r": 0.5}
)
explainer = KNeighborsCounterfactual(random_state=1, method="auto")
if model == "wildboar_rsf":
from wildboar.ensemble import ShapeletForestClassifier
from wildboar.explain.counterfactual import ShapeletForestCounterfactual
filename = "wildboar_rsf.sav"
classifier = ShapeletForestClassifier(
n_estimators=100,
metric="euclidean",
max_depth=5,
random_state=1,
)
explainer = ShapeletForestCounterfactual(random_state=1)
classifier.fit(X_train, y_train)
# Assuming you have X_test and y_test as test data
y_pred = classifier.predict(X_test)
# Generate the classification report
class_report = classification_report(y_test, y_pred, output_dict=True)
# Convert the classification report to a pandas DataFrame
class_report = pd.DataFrame(class_report).transpose()
X_cf, y_pred, cf_pred = find_counterfactuals(classifier, explainer, X_test)
# save x_test, y_test for future use
if model_path:
x_test_df = pd.DataFrame(X_test)
x_test_df.columns = data.iloc[:, :-1].columns
x_test_df.to_csv(model_path + "/X_test.csv", index=None)
np.save(model_path + "/y_test.npy", y_test)
pickle.dump(classifier, open(model_path + f"/{filename}", "wb"))
np.save(model_path + "/X_cf.npy", X_cf)
np.save(model_path + "/y_pred.npy", y_pred)
np.save(model_path + "/cf_pred.npy", cf_pred)
return class_report
def testing(name, type):
data = pd.read_csv(name)
y_test = data["diagnosis"]
X_test = data.drop("diagnosis", axis=1)
if "lr" == type:
filename = "lr.sav"
clf = joblib.load(filename)
y_pred = clf.predict(X_test)
importance = clf.coef_[0]
model = clf
if "xgb" == type:
filename = "xgb.sav"
xgb = joblib.load(filename)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == type:
filename = "dt.sav"
dt = joblib.load(filename)
y_pred = dt.predict(X_test)
importance = dt.feature_importances_
model = dt
if "svm" == type:
filename = "svm.sav"
svc = joblib.load(filename)
y_pred = svc.predict(X_test)
importance = svc.coef_[0]
model = svc
if "rf" == type:
filename = "rf.sav"
rf = joblib.load(filename)
y_pred = rf.predict(X_test)
importance = rf.feature_importances_
model = rf
clas_report = classification_report(y_test, y_pred, output_dict=True)
clas_report = pd.DataFrame(clas_report).transpose()
clas_report = clas_report.sort_values(by=["f1-score"], ascending=False)
fig2 = px.bar(x=importance, y=X_test.columns)
pickle.dump(model, open(filename, "wb"))
con = {
"fig2": fig2.to_html(),
"clas_report": clas_report,
}
return con
# compute counterfactuals
def counterfactuals(
query,
model,
df,
class_label,
continuous_features,
num_counterfactuals=5,
features_to_vary=[],
):
# print("edw prin to counterfactuals")
if "id" in df.columns:
df = df.drop("id", axis=1)
if "id" in query.columns:
query = query.drop("id", axis=1)
query = query.drop(class_label, axis=1)
# data = df.drop(class_label, axis=1)
# continuous_features = df.drop(class_label, axis=1).columns.tolist()
# continuous_features = (
# df.drop(class_label, axis=1).select_dtypes(exclude=["object"]).columns.tolist()
# )
d = dice_ml.Data(
dataframe=df,
continuous_features=continuous_features,
outcome_name=class_label,
)
m = dice_ml.Model(model=model, backend="sklearn")
exp = dice_ml.Dice(d, m, method="genetic")
if len(features_to_vary) > 0:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
features_to_vary=features_to_vary,
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
)
except Exception as e:
print(e)
dice_exp = None
else:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
)
except Exception as e:
print(e)
dice_exp = None
if dice_exp:
return dice_exp._cf_examples_list
return dice_exp
def get_dataframe(path):
df = pd.read_csv(path)
return df
def generatePCA(preprocess_df):
pca = PCA()
pca.fit(preprocess_df)
exp_var_cumul = np.cumsum(pca.explained_variance_ratio_)
pca = px.area(
x=range(1, exp_var_cumul.shape[0] + 1),
y=exp_var_cumul,
labels={"x": "# Components", "y": "Explained Variance"},
)
pca.update_layout(
autosize=True,
)
return pca
def generateTSNE(preprocess_df, dataset_type, class_label=None):
# tSNE
tsne = TSNE(n_components=2, random_state=39)
if dataset_type == "tabular":
projections = tsne.fit_transform(preprocess_df.drop(class_label, axis=1).values)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
class_label: preprocess_df[class_label].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color=class_label,
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
preprocess_df_drop_class = preprocess_df.iloc[:, :-1]
projections = tsne.fit_transform(preprocess_df_drop_class)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
"class": preprocess_df.iloc[:, -1].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color="class",
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
return tsne, projections
def generateAugmentedTSNE(
df, cf_df, num_counterfactuals, point, tsne_path, class_label
):
"""
Given a tsne graph, add the traces of the computed counterfactuals for a given point and return the new graph
Parameters
----------
df: dataframe used to compute tsne
cf_df: counterfactuals dataframe
point: original point of for which counterfatuals were computed
tsne_path: path to the original tsne plot
Returns
-------
The tsne graph updated with new counterfactuals points. Cunterfactual points and the point itself are resized
"""
# make the tsne (the same tsne but with extra points
# the counterfactuals points descriped in
# counterfactuals.csv)
tsne_cf = TSNE(n_components=2, random_state=0)
# merge counterfactuals csv with tsne_data
df_merged = pd.concat([cf_df, df], ignore_index=True, axis=0)
projections = tsne_cf.fit_transform(df_merged.drop(class_label, axis=1).values)
# cf_df containes the projection values and the class_label
# value of the counterfactual points. Porjections is np array
# containing pairs of x and y values for each tsne graph point
#
cf_df = pd.DataFrame(
{
"0": projections[:num_counterfactuals, 0],
"1": projections[:num_counterfactuals, 1],
class_label: cf_df[class_label].iloc[:num_counterfactuals].astype(str),
}
)
# cf_df = pd.concat([cf_df, point], ignore_index=True, axis=0)
# new = {'0':'Front hello', '1': 'hi'}
# cf_s.for_each_trace(lambda t: t.update(name = new[t.name]))
point_s = px.scatter(
point,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
point_s["data"][0]["name"] = "Original data"
cf_s = px.scatter(
cf_df,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
cf_s["data"][0]["name"] = "Counterfactual"
clicked_and_cf_s = go.Figure(data=cf_s.data + point_s.data)
clicked_and_cf_s.update_traces(
marker=dict(size=10, symbol="circle", line=dict(width=2))
)
tsne = joblib.load(tsne_path)
tsne = go.Figure(data=tsne.data + clicked_and_cf_s.data)
tsne.update_layout(clickmode="event+select", autosize=True)
# tsne.add_trace(cf_s.data[0])
return tsne
# tsne_cf = TSNE(n_components=2, random_state=0)
# projections = tsne_cf.fit_transform(
# df_merged.drop(["diagnosis"], axis=1).values
# )
# cf_df = pd.DataFrame(
# {
# "0": projections[:num_counterfactuals, 0],
# "1": projections[:num_counterfactuals, 1],
# "diagnosis": cf_df.diagnosis.iloc[:3],
# }
# )
# cf_df = pd.concat([cf_df, clicked_point_df], ignore_index=True, axis=0)
# cf_s = px.scatter(
# cf_df,
# x="0",
# y="1",
# color="diagnosis",
# color_continuous_scale=px.colors.sequential.Rainbow,
# )
# cf_s.update_traces(
# marker=dict(
# size=10,
# symbol="circle",
# )
# )
# tsne = joblib.load("tsne.sav")
# tsne.add_trace(cf_s.data[0])
# pickle.dump(tsne, open("tsne_cfs.sav", "wb"))
# tsne = tsne.to_html()
def format_error_context(exception, default_message):
"""
Helper function to format error context with file, line, and traceback details.
"""
tb = traceback.extract_tb(exception.__traceback__)[-1]
print(
"message",
f"{default_message}: {str(exception)}",
"\nfile",
tb.filename,
"\nline",
tb.lineno,
"\ntrace",
traceback.format_exc(),
)
return {
"message": f"{default_message}: {str(exception)}",
"file": tb.filename,
"line": tb.lineno,
"trace": traceback.format_exc(),
}
def get_ecg_entry(X_test, y_test, i, class_label):
# timeseries
# samples subplots
# Create subplots
fig = go.Figure()
y = X_test[y_test == class_label].iloc[i]
index = X_test[y_test == class_label].index[i]
if class_label == 0:
name = "Normal ECG"
else:
name = "Abnormal ECG"
# Adding the ECG trace
fig.add_trace(
go.Scatter(
y=y,
mode="lines",
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
yaxis_title=name,
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(y), max(y)],
) # Addspacing between gridlights
return fig, int(index)
def ecg_plot_counterfactuals(i, X_test, y_test, y_pred, X_cf, cf_pred):
fig = go.Figure()
neg = 0
pos = 1
y_test = np.where(y_test == 1, pos, neg)
# print("y_test: ", y_test)
# print("y_pred: ", y_pred)
# print("cf_pred: ", cf_pred)
# Original time series
fig.add_trace(
go.Scatter(
y=X_test.iloc[i],
mode="lines",
name="Original (y_pred = %d, y_actual = %d)" % (y_pred[i], y_test[i]),
line=dict(width=0.5),
)
)
if len(X_cf[i].shape) > 1:
X_cf_flattened = X_cf[i].flatten()
# Counterfactual time series
fig.add_trace(
go.Scatter(
y=X_cf_flattened,
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
else:
fig.add_trace(
go.Scatter(
y=X_cf[i],
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(X_test.iloc[i]), max(X_test.iloc[i])],
) # Addspacing between gridlights
# # Mean time series of the counterfactual class
# mean_cf_class = np.mean(X_test.loc[y_test == cf_pred[i]], axis=0)
# fig.add_trace(go.Scatter(
# y=mean_cf_class,
# mode='lines',
# name="Mean of X with y = %d" % cf_pred[i],
# line=dict(width=1, dash='dash')
# ))
fig.update_layout(
xaxis_title="Timepoints", yaxis_title="Values", legend=dict(x=0.01, y=0.99)
)
return fig
def get_info_of_dataframe(df):
# Creating a DataFrame to store the summary
summary_data = {
"Total Rows": [df.shape[0]],
"Total Columns": [df.shape[1]],
"Missing Values (Total)": [df.isnull().sum().sum()],
"Missing Values (Columns)": [df.isnull().any(axis=0).sum()],
"Categorical Columns": [(df.dtypes == "object").sum()],
"Numeric Columns": [df.select_dtypes(include=["number"]).shape[1]],
}
summary_df = pd.DataFrame(summary_data)
# Create a Plotly Table with enhanced styling
fig = go.Figure(
data=[
go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Value</b>"],
fill_color="#4CAF50",
align="left",
font=dict(color="white", size=14),
height=30,
),
cells=dict(
values=[summary_df.columns, summary_df.iloc[0].tolist()],
fill_color=[["#f9f9f9", "white"] * len(summary_df)],
align="left",
font=dict(color="black", size=12),
height=25,
),
)
]
)
fig.update_layout(
title_x=0.5, # Center title
title_y=0.95,
margin=dict(l=20, r=20, t=50, b=20),
width=600,
height=300, # Adjust height based on the content
)
# Convert Plotly figure to HTML
return fig.to_html()
def update_column_list_with_one_hot_columns(df_original, df_encoded, column_list, unique_threshold=10):
"""
Update a list of column names by replacing categorical columns with their one-hot encoded counterparts,
including numeric columns with limited unique values.
:param df_original: Original DataFrame before encoding.
:param df_encoded: DataFrame after one-hot encoding.
:param column_list: List of columns to update.
:param unique_threshold: Maximum unique values to treat numeric columns as categorical.
:return: Updated list of columns.
"""
updated_columns = []
for column in column_list:
# Check if the column exists in the original DataFrame
if column in df_original.columns:
# Check if the column is categorical or numeric with limited unique values
if (
pd.api.types.is_categorical_dtype(df_original[column]) # Explicit categorical
or df_original[column].dtype == "object" # Object type
or df_original[column].nunique() <= 10 # Numeric with limited unique values
):
# Find one-hot encoded sub-columns
one_hot_columns = [
col for col in df_encoded.columns if col.lower().startswith(f"{column.lower()}_")
]
# Add the one-hot encoded columns to the list
if one_hot_columns:
updated_columns.extend(one_hot_columns)
else:
# If no one-hot columns are found, keep the original column
updated_columns.append(column)
else:
# If not categorical, keep the column as is
updated_columns.append(column)
else:
# Column not found in the original DataFrame, keep it unchanged
updated_columns.append(column)
return updated_columns
# Function to extract continuous features
def get_continuous_features(df):
# Filter columns based on dtype and exclude binary columns
continuous_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Exclude binary features (0 and 1 values)
continuous_columns = [
col
for col in continuous_columns
if df[col].nunique() > 2 # Exclude binary features
]
# Return only the continuous features
return list(continuous_columns)
# Function to extract continuous features
def get_categorical_features(df):
# Filter columns based on dtype and exclude binary columns
categorical_columns = df.select_dtypes(include=["object", "category"]).columns
# Return only the continuous features
return list(categorical_columns)
# Function to extract non-continuous features
def get_non_continuous_features(df):
# Select numeric columns
numeric_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Identify binary columns (having only two unique values, like 0/1)
binary_columns = [col for col in numeric_columns if df[col].nunique() == 2]
# Select non-numeric columns
non_numeric_columns = df.select_dtypes(
exclude=["float64", "int64"]
).columns.tolist()
# Combine binary columns and non-numeric columns
non_continuous_columns = binary_columns + non_numeric_columns
# Return only the non-continuous features
return list(non_continuous_columns)
def find_counterfactuals(estimator, explainer, X):
y_pred = estimator.predict(X)
y_desired = np.empty_like(y_pred)
# Store an array of the desired label for each sample.
# We assume a binary classification task and the the desired
# label is the inverse of the predicted label.
a, b = estimator.classes_
y_desired[y_pred == a] = b
y_desired[y_pred == b] = a
# Initialize the explainer, using the medoid approach.
explainer.fit(estimator)
# Explain each sample in X as the desired label in y_desired
X_cf = explainer.explain(X, y_desired)
return X_cf, y_pred, estimator.predict(X_cf)
def is_column_categorical_like(
df, column_name, unique_threshold=10, ratio_threshold=0.05
):
"""
Determines if a numeric column has categorical characteristics.
Parameters:
df (DataFrame): The DataFrame containing the data.
column_name (str): The column name to check.
unique_threshold (int): Maximum number of unique values to consider as categorical.
ratio_threshold (float): Maximum ratio of unique values to total rows to consider as categorical.
Returns:
bool: True if the column is likely categorical, False otherwise.
"""
unique_values = df[column_name].nunique() # Number of unique values
total_values = len(df[column_name]) # Total number of rows
unique_ratio = unique_values / total_values # Ratio of unique values to total rows
# Check if the column is numeric
if pd.api.types.is_numeric_dtype(df[column_name]):
# Consider it categorical if it has fewer than `unique_threshold` unique values
# or if the unique values ratio is below `ratio_threshold`
if unique_values <= unique_threshold or unique_ratio <= ratio_threshold:
return True
return False
# Function to flatten the dictionary
def flatten_dict(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def convert_to_camel_case(s):
# Split the string by hyphen
parts = s.split("-")
# Capitalize each part and join them together
camel_case = "".join(word.capitalize() for word in parts)
return camel_case
# def fetch_line_by_dataset(file_path, dataset, constraint):
def fetch_line_by_dataset(file_path, dataset):
"""
Fetches a line from the file based on the specified dataset name to retrieve basic information about the dataset.
:param file_path: Path to the input file.
:param dataset: The dataset name to search for.
:return: The line matching the dataset and constraint, or None if not found.
"""
with open(file_path, "r") as file:
for line in file:
# Strip leading whitespace
stripped_line = line.strip()
# Skip lines that start with #
if stripped_line.startswith("#"):
continue
# Use regular expressions for exact match of the dataset
dataset_pattern = rf"--dataset\s+{re.escape(dataset)}\b"
if re.search(dataset_pattern, stripped_line):
return stripped_line
return None
def extract_arguments_from_line(line):
"""
Extracts all words that come immediately after '--' arguments in the line.
:param line: A string containing the line to parse.
:return: A list of argument values found in the line.
"""
# Find all arguments and their values
# matches = re.findall(r"(--[\w-]+)((?:\s+[^-][^\s]*)*)", line)
matches = re.findall(r"(--[\w-]+)\s+([^\s]+)", line)
# Extract argument values
arguments = [value.strip() for _, value in matches if value.strip()]
return arguments
def create_tuple_of_models_text_value(available_pre_trained_models):
available_pretrained_models_info = []
for model in available_pre_trained_models:
if "xgb" == model:
available_pretrained_models_text = "XGBoost"
elif "rf" == model:
available_pretrained_models_text = "Random Forest"
elif "lr" == model:
available_pretrained_models_text = "Logistic Regression"
elif "dt" == model:
available_pretrained_models_text = "Decision Tree"
elif "svm" == model:
available_pretrained_models_text = "Support Vector Machine"
elif "glacier" == model:
available_pretrained_models_text = "Glacier 1dCNN"
elif "wildboar_knn" == model:
available_pretrained_models_text = "Wildboar K-Nearest Neighbours"
elif "wildboar_rsf" == model:
available_pretrained_models_text = "Wildboar Random Shapelet Forest"
available_pretrained_models_info.append(
(model, available_pretrained_models_text)
)
return available_pretrained_models_info
def remove_dir_and_empty_parent(path):
"""
Remove the specified directory and, if its parent directory is empty, remove the parent as well.
"""
shutil.rmtree(path)
parent_dir = os.path.dirname(path)
# Remove parent directory if it is empty
if not os.listdir(parent_dir):
shutil.rmtree(parent_dir)