EXTREMUM_web/base/methods.py
2024-11-15 20:50:36 +02:00

1371 lines
45 KiB
Python
Executable File

import dice_ml.data_interfaces
import dice_ml.data_interfaces.private_data_interface
import pandas as pd
import pickle, os
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from pandas.api.types import is_numeric_dtype
from sklearn.metrics import classification_report
import plotly.express as px
from django.conf import settings
import joblib
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import dice_ml
from dict_and_html import *
import plotly.graph_objects as go
import math
from imblearn.over_sampling import SMOTE
from scipy.stats import median_abs_deviation
from numpy.fft import *
from sklearn.preprocessing import MinMaxScaler
from plotly.subplots import make_subplots
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from .glacier.src.gc_latentcf_search_1dcnn_function import gc_latentcf_search_1dcnn
from .glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
import re
import json
PIPELINE_PATH = os.path.join(settings.BASE_DIR, "base/pipelines/")
def stats(
dataset_path,
dataset_type,
pos=None,
neg=None,
feature1=None,
feature2=None,
label=None,
name=None,
):
print(dataset_type)
if dataset_type == "tabular":
df = pd.read_csv(dataset_path)
binary1 = df[feature1].isin([0, 1]).all()
binary2 = df[feature2].isin([0, 1]).all()
if binary1 == True or binary2 == True:
fig = px.histogram(df, x=feature1, y=feature2, color=label)
elif is_numeric_dtype(df[feature1]) or is_numeric_dtype(df[feature2]):
if not is_numeric_dtype(df[feature1]) or not is_numeric_dtype(df[feature2]):
# feature1 is not numeric but feature2 should be
fig = px.histogram(df, x=feature1, y=feature2, color=label)
else:
# they both are numeric so do scatter
if is_column_categorical_like(
df, feature1
) and not is_column_categorical_like(df, feature2):
# Add jitter to the 'Categorical_Like_Numeric' column
df[feature1] = df[feature1] + np.random.uniform(
-0.1, 0.1, size=df.shape[0]
)
# Create a scatter plot using Plotly
fig = px.scatter(
df,
x=feature1,
y=feature2,
color=df[label].astype(str),
)
elif is_column_categorical_like(
df, feature2
) and not is_column_categorical_like(df, feature1):
print(df)
df[feature2] = df[feature2] + np.random.uniform(
-0.1, 0.1, size=df.shape[0]
)
# Create a scatter plot using Plotly
fig = px.scatter(
df,
x=feature1,
y=feature2,
color=df[label].astype(str),
)
elif is_column_categorical_like(
df, feature2
) and is_column_categorical_like(df, feature1):
df_grouped = (
df.groupby([feature1, feature2, label])
.size()
.reset_index(name="Count")
)
# Create a bubble plot
fig = px.scatter(
df_grouped,
x=feature1,
y=feature2,
size="Count",
color=df_grouped[label].astype(str),
)
else:
# print(
# is_column_categorical_like(df, feature1),
# is_column_categorical_like(df, feature2),
# )
fig = px.scatter(
df, x=feature1, y=feature2, color=df[label].astype(str)
)
else:
# they both are categorical
fig = px.bar(df, x=feature1, y=feature2, color=label, barmode="group")
fig.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
# timeseries
df = pd.read_csv(dataset_path)
# samples subplots
# Create subplots
# column numbers - target_column
# TODO: case for when the dataset has
# id column
if name:
if name == "two-lead-ecg":
negative_label = "Signal 0"
positive_label = "Signal 1"
elif name == "gun-point":
negative_label = "Gun"
positive_label = "No gun"
elif name == "italy-power-demand":
negative_label = "October to March power demand"
positive_label = "April to September power demand"
elif name == "ecg-five-days":
negative_label = "12/11/1990"
positive_label = "17/11/1990"
elif name == "ford-a":
negative_label = "Negative label"
positive_label = "Positive label"
# hard coded need to be dynamic based on
# dataset
negative_label_value = neg
positive_label_value = pos
num_timesteps = df.shape[1] - 1
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
negative_label,
negative_label,
positive_label,
positive_label,
),
)
# suppose univariative
# TODO: multivariative
target_labels = list(df.iloc[:, -1].unique())
positive = target_labels[1]
negative = target_labels[0]
# Add normal ECG trace 1
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[0, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=1,
)
# Add normal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[1, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=2,
)
# Add abnormal ECG trace 1
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[0, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=1,
)
# Add abnormal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[1, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=2,
)
# Update layout
fig.update_layout(
xaxis_title="Timesteps",
yaxis_title="ECG Value",
showlegend=False,
autosize=True,
)
# confidence plot
df = df.iloc[:, :-1]
df_grouped = df.agg(["mean", "std", "count"]).transpose()
df_grouped["ci"] = 40 * df_grouped["std"] / np.sqrt(df_grouped["count"])
df_grouped["ci_lower"] = df_grouped["mean"] - df_grouped["ci"]
df_grouped["ci_upper"] = df_grouped["mean"] + df_grouped["ci"]
fig1 = go.Figure(
[
go.Scatter(
name="Avg",
x=df_grouped.index,
y=round(df_grouped["mean"], 2),
mode="lines",
line=dict(color="rgb(31, 119, 180)"),
),
go.Scatter(
name="95% CI Upper",
x=df_grouped.index,
y=round(df_grouped["ci_upper"], 2),
mode="lines",
marker=dict(color="#444"),
line=dict(width=0),
showlegend=False,
),
go.Scatter(
name="95% CI Lower",
x=df_grouped.index,
y=round(df_grouped["ci_lower"], 2),
marker=dict(color="#444"),
line=dict(width=0),
mode="lines",
fillcolor="rgba(68, 68, 68, 0.3)",
fill="tonexty",
showlegend=False,
),
]
)
fig1.update_layout(
title="Confidence plot for Two Lead ECG dataset",
xaxis_title="Timestep",
yaxis_title="Avg ECG value",
hovermode="x",
)
fig1.update_yaxes(rangemode="tozero")
return fig.to_html(), fig1.to_html()
# fig = px.line(df.iloc[int(feature1)])
return fig.to_html()
def compare_values(val1, val2):
if isinstance(val1, float) and isinstance(val2, float):
return not math.isclose(float(val1), float(val2))
else:
return val1 != val2
def preprocess(data, value_list, name, dataset_type, path=None, class_label=None):
if dataset_type == "tabular":
if "id" in data.columns:
ids = data["id"]
data = data.drop(["id"], axis=1)
total_nan = data.isna().sum().sum()
if "imp" in value_list:
data = imputations(data, class_label, path)
imputed_data = data
if "onehot" in value_list:
data = onehot(data, path)
if "std" in value_list:
data = scaling(data, class_label, path)
if "id" in data.columns:
data = pd.concat([ids.to_frame(), data], axis=1, ignore_index=False)
if total_nan > 0:
os.remove(name)
imputed_data = pd.concat(
[ids.to_frame(), imputed_data], axis=1, ignore_index=False
)
imputed_data.to_csv(name, index=False)
elif dataset_type == "timeseries":
# timeseries
# save last columns values
data_class_col = data.iloc[:, -1]
# drop last column that contains class_labels
data = data.iloc[:, :-1]
if "imp" in value_list:
data = imputations_ts(data, path)
if "denoise" in value_list:
data = data.apply(denoise, args=(path,), axis=0)
if "std" in value_list:
data = scaling_ts(data, path)
data = pd.concat([data, data_class_col], axis=1)
# os.remove(name)
# data.to_csv(name, index=False)
return data
###--------------------------###
### TIMESERIES PREPROCESSING ###
def scaling_ts(data, path):
# Normalize the data using Min-Max scaling
scaler = MinMaxScaler()
data[data.columns] = scaler.fit_transform(data)
pickle.dump(scaler, open(path + "/min_max_scaler.sav", "wb"))
return data
def denoise(series, path):
# Apply FFT
fft_vals = fft(series)
fft_freqs = np.fft.fftfreq(len(fft_vals))
# Filter frequencies
fft_vals[np.abs(fft_freqs) > 0.1] = 0
# Inverse FFT to reconstruct the signal
denoised_series = ifft(fft_vals).real
return pd.Series(denoised_series, index=series.index)
def outlier_detection(series, path):
median = series.median()
mad = median_abs_deviation(series)
return np.abs(series - median) / mad > 3
def imputations_ts(data, path):
data[data.columns] = data[data.columns].fillna(data.mean())
return data
### TIMESERIES PREPROCESSING ###
###--------------------------###
###--------------------------###
### TABULAR PREPROCESSING ###
def onehot(data, path):
encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
categorical_columns = data.select_dtypes(include=["object"]).columns.tolist()
# Apply one-hot encoding to the categorical columns
one_hot_encoded = encoder.fit_transform(data[categorical_columns]).astype(float)
one_hot_df = pd.DataFrame(
one_hot_encoded, columns=encoder.get_feature_names_out(categorical_columns)
)
pickle.dump(encoder, open(path + "/one_hot.sav", "wb"))
# Concatenate the one-hot encoded dataframe with the original dataframe
df_encoded = pd.concat([data, one_hot_df], axis=1)
# Drop the original categorical columns
data = df_encoded.drop(categorical_columns, axis=1)
return data
def imputations(data, class_label, path):
imp = SimpleImputer(missing_values=np.nan, strategy="mean")
y = data[class_label]
data = data.drop([class_label], axis=1)
numeric_cols = data.select_dtypes(include=["float64", "int64"]).columns
print("Numeric columns ", numeric_cols)
data[numeric_cols] = imp.fit_transform(data[numeric_cols])
# Convert back to DataFrame and restore original data types
data[numeric_cols] = data[numeric_cols].astype(float)
pickle.dump(imp, open(path + "/imp.sav", "wb"))
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
return data
def scaling(data, class_label, path):
scaler = StandardScaler()
# should not scale binary classes
# define standard scaler
y = data[class_label]
# if class column is numeric do not
# apply preprocessing
data = data.drop([class_label], axis=1)
# transform data
cols = data.select_dtypes(np.number).columns
# keep non-binary columns
nonbinary_columns = [
col for col in cols if not data[col].dropna().isin([0, 1]).all()
]
data[nonbinary_columns] = scaler.fit_transform(data[nonbinary_columns])
pickle.dump(scaler, open(path + "/standard_scaler.sav", "wb"))
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
return data
### TABULAR PREPROCESSING ###
###--------------------------###
def decode_cf(df, row, class_label, path, preprocessing_list):
cf_row = row.copy()
# get actual numerical columns
df_numerical = df.select_dtypes(exclude=["object"]).columns.tolist()
nonbinary_numeric_columns = [
col for col in df_numerical if not df[col].dropna().isin([0, 1]).all()
]
# get actual categorical columns
df_categorical = (
df.drop([class_label], axis=1)
.select_dtypes(include=["object"])
.columns.tolist()
)
if "onehot" in preprocessing_list:
ohe = joblib.load(path + "/one_hot.sav")
# if there were categorical columns in the dataframe
if ohe.get_feature_names_out().size > 0:
one_hot_decoded = ohe.inverse_transform(cf_row[ohe.get_feature_names_out()])
cf_categorical_columns = ohe.get_feature_names_out()
# Drop the original categorical columns
cf_row = cf_row.drop(cf_categorical_columns, axis=1)
cf_row[df_categorical] = one_hot_decoded
if "std" in preprocessing_list:
scaler = joblib.load(path + "/standard_scaler.sav")
print(nonbinary_numeric_columns)
cf_row[nonbinary_numeric_columns] = scaler.inverse_transform(
cf_row[nonbinary_numeric_columns]
)
cf_row[class_label] = cf_row[class_label]
le = joblib.load(path + "/label_encoder.sav")
cf_row[class_label] = le.inverse_transform(cf_row[class_label])
return cf_row
def training(
data,
model,
test_size,
label,
dataset_type,
df_name,
model_path=None,
autoencoder="No",
experiment_arguments=None,
):
X = data
if dataset_type == "tabular":
if "id" in data.columns:
X = data.drop("id", axis=1)
y = X[label]
X = X.drop(label, axis=1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, stratify=y.values, random_state=42
)
if df_name == "stroke":
# needs Oversampling
## TODO if class_label is multi class SMOTE needs to have sampling strategy Dict
## TODO check if df needs oversampling
oversample = SMOTE(sampling_strategy=0.4, random_state=42)
X_sm, y_sm = oversample.fit_resample(
X_train,
y_train,
)
X_train, X_test, y_train, y_test = train_test_split(
X_sm, y_sm, test_size=test_size, stratify=y_sm.values, random_state=42
)
if "lr" == model:
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0).fit(X_train, y_train)
y_pred = clf.predict(X_test)
filename = "lr.sav"
importance = clf.coef_[0]
model = clf
if "xgb" == model:
from xgboost import XGBClassifier
# TODO enable_categorical whould be dynamically added
# when there are categorical variables in the dataset
xgb = XGBClassifier(
n_estimators=200, # Number of trees (boosting rounds)
learning_rate=0.1, # Step size shrinkage (eta)
max_depth=4, # Maximum tree depth
min_child_weight=1, # Minimum sum of weights in a child
subsample=0.8, # Fraction of samples used per tree
colsample_bytree=0.8, # Fraction of features used per tree
gamma=0, # Minimum loss reduction to make a split
reg_lambda=1, # L2 regularization term (ridge)
reg_alpha=0, # L1 regularization term (lasso)
objective="binary:logistic", # Binary classification objective
use_label_encoder=False, # Avoids unnecessary warnings for older versions
eval_metric="logloss", # Logarithmic loss evaluation metric
).fit(X_train, y_train)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == model:
from sklearn.tree import DecisionTreeClassifier
dt = DecisionTreeClassifier(max_depth=30, random_state=42)
dt.fit(X_train, y_train)
y_pred = dt.predict(X_test)
filename = "dt.sav"
importance = dt.feature_importances_
model = dt
if "svm" == model:
from sklearn import svm
svc = svm.SVC(kernel="linear", probability=True)
svc.fit(X_train, y_train)
y_pred = svc.predict(X_test)
filename = "svm.sav"
importance = svc.coef_[0]
model = svc
if "rf" == model:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier()
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
filename = "rf.sav"
importance = rf.feature_importances_
model = rf
class_report = classification_report(y_test, y_pred.flatten(), output_dict=True)
class_report = pd.DataFrame(class_report)
feature_importance = px.bar(x=importance, y=X_train.columns)
if model_path:
pickle.dump(model, open(model_path + f"/{filename}", "wb"))
feature_importance_dict = dict(zip(X_train.columns, importance))
return feature_importance, class_report, feature_importance_dict
else:
# TODO: add 1dcnn train
if model == "glacier":
# Split the lr-list string and convert each value to float
# experiment_arguments[8] = experiment_arguments[8].rstrip(';')
# lr_list = [float(x) for x in experiment_arguments[8].split()]
gc_latentcf_search_1dcnn(
data,
int(experiment_arguments[1]),
int(experiment_arguments[2]),
model_path + f"/{experiment_arguments[3]}",
model_path,
autoencoder,
)
elif model == "wildboar_knn" or model == "wildboar_rsf":
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=1
)
# n_samples, n_timestep = X_train.shape
# y_labels, counts = np.unique(y_train, return_counts=True)
# print(
# f"""
# The dataset contains {n_samples} samples with {n_timestep} time steps each.
# Of the samples, {counts[0]} is labeled as {y_labels[0]} and {counts[1]} labeled
# as {y_labels[1]}. Here, we plot the time series.
# """
# )
from wildboar.utils.plot import plot_time_domain
if model == "wildboar_knn":
from wildboar.distance import KNeighborsClassifier
from wildboar.explain.counterfactual import KNeighborsCounterfactual
filename = "wildboar_knn.sav"
classifier = KNeighborsClassifier(
n_neighbors=5, metric="dtw", metric_params={"r": 0.5}
)
explainer = KNeighborsCounterfactual(random_state=1, method="auto")
if model == "wildboar_rsf":
from wildboar.ensemble import ShapeletForestClassifier
from wildboar.explain.counterfactual import ShapeletForestCounterfactual
filename = "wildboar_rsf.sav"
classifier = ShapeletForestClassifier(
n_estimators=100,
metric="euclidean",
max_depth=5,
random_state=1,
)
explainer = ShapeletForestCounterfactual(random_state=1)
classifier.fit(X_train, y_train)
# Assuming you have X_test and y_test as test data
y_pred = classifier.predict(X_test)
# Generate the classification report
class_report = classification_report(y_test, y_pred, output_dict=True)
# Convert the classification report to a pandas DataFrame
class_report = pd.DataFrame(class_report).transpose()
X_cf, y_pred, cf_pred = find_counterfactuals(classifier, explainer, X_test)
# save x_test, y_test for future use
if model_path:
x_test_df = pd.DataFrame(X_test)
x_test_df.columns = data.iloc[:, :-1].columns
x_test_df.to_csv(model_path + "/X_test.csv", index=None)
np.save(model_path + "/y_test.npy", y_test)
pickle.dump(classifier, open(model_path + f"/{filename}", "wb"))
np.save(model_path + "/X_cf.npy", X_cf)
np.save(model_path + "/y_pred.npy", y_pred)
np.save(model_path + "/cf_pred.npy", cf_pred)
return class_report
def testing(name, type):
data = pd.read_csv(name)
y_test = data["diagnosis"]
X_test = data.drop("diagnosis", axis=1)
if "lr" == type:
filename = "lr.sav"
clf = joblib.load(filename)
y_pred = clf.predict(X_test)
importance = clf.coef_[0]
model = clf
if "xgb" == type:
filename = "xgb.sav"
xgb = joblib.load(filename)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == type:
filename = "dt.sav"
dt = joblib.load(filename)
y_pred = dt.predict(X_test)
importance = dt.feature_importances_
model = dt
if "svm" == type:
filename = "svm.sav"
svc = joblib.load(filename)
y_pred = svc.predict(X_test)
importance = svc.coef_[0]
model = svc
if "rf" == type:
filename = "rf.sav"
rf = joblib.load(filename)
y_pred = rf.predict(X_test)
importance = rf.feature_importances_
model = rf
clas_report = classification_report(y_test, y_pred, output_dict=True)
clas_report = pd.DataFrame(clas_report).transpose()
clas_report = clas_report.sort_values(by=["f1-score"], ascending=False)
fig2 = px.bar(x=importance, y=X_test.columns)
pickle.dump(model, open(filename, "wb"))
con = {
"fig2": fig2.to_html(),
"clas_report": clas_report,
}
return con
# compute counterfactuals
def counterfactuals(
query,
model,
df,
class_label,
continuous_features,
num_counterfactuals=5,
features_to_vary=[],
):
if "id" in df.columns:
df = df.drop("id", axis=1)
if "id" in query.columns:
query = query.drop("id", axis=1)
query = query.drop(class_label, axis=1)
# data = df.drop(class_label, axis=1)
# continuous_features = df.drop(class_label, axis=1).columns.tolist()
# continuous_features = (
# df.drop(class_label, axis=1).select_dtypes(exclude=["object"]).columns.tolist()
# )
print(df.dtypes)
d = dice_ml.Data(
dataframe=df,
continuous_features=continuous_features,
outcome_name=class_label,
)
m = dice_ml.Model(model=model, backend="sklearn")
exp = dice_ml.Dice(d, m)
if len(features_to_vary) > 0:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
features_to_vary=features_to_vary,
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
random_seed=42,
)
except Exception as e:
print(e)
dice_exp = None
else:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
random_seed=42,
)
except Exception as e:
print(e)
dice_exp = None
if dice_exp:
return dice_exp._cf_examples_list
return dice_exp
def get_dataframe(path):
df = pd.read_csv(path)
return df
def generatePCA(preprocess_df):
pca = PCA()
pca.fit(preprocess_df)
exp_var_cumul = np.cumsum(pca.explained_variance_ratio_)
pca = px.area(
x=range(1, exp_var_cumul.shape[0] + 1),
y=exp_var_cumul,
labels={"x": "# Components", "y": "Explained Variance"},
)
pca.update_layout(
autosize=True,
)
return pca
def generateTSNE(preprocess_df, dataset_type, class_label=None):
# tSNE
tsne = TSNE(n_components=2, random_state=39)
if dataset_type == "tabular":
projections = tsne.fit_transform(preprocess_df.drop(class_label, axis=1).values)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
class_label: preprocess_df[class_label].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color=class_label,
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
preprocess_df_drop_class = preprocess_df.iloc[:, :-1]
projections = tsne.fit_transform(preprocess_df_drop_class)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
"class": preprocess_df.iloc[:, -1].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color="class",
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
return tsne, projections
def generateAugmentedTSNE(
df, cf_df, num_counterfactuals, point, tsne_path, class_label
):
"""
Given a tsne graph, add the traces of the computed counterfactuals for a given point and return the new graph
Parameters
----------
df: dataframe used to compute tsne
cf_df: counterfactuals dataframe
point: original point of for which counterfatuals were computed
tsne_path: path to the original tsne plot
Returns
-------
The tsne graph updated with new counterfactuals points. Cunterfactual points and the point itself are resized
"""
# make the tsne (the same tsne but with extra points
# the counterfactuals points descriped in
# counterfactuals.csv)
tsne_cf = TSNE(n_components=2, random_state=0)
# merge counterfactuals csv with tsne_data
df_merged = pd.concat([cf_df, df], ignore_index=True, axis=0)
projections = tsne_cf.fit_transform(df_merged.drop(class_label, axis=1).values)
# cf_df containes the projection values and the class_label
# value of the counterfactual points. Porjections is np array
# containing pairs of x and y values for each tsne graph point
#
cf_df = pd.DataFrame(
{
"0": projections[:num_counterfactuals, 0],
"1": projections[:num_counterfactuals, 1],
class_label: cf_df[class_label].iloc[:num_counterfactuals].astype(str),
}
)
# cf_df = pd.concat([cf_df, point], ignore_index=True, axis=0)
# new = {'0':'Front hello', '1': 'hi'}
# cf_s.for_each_trace(lambda t: t.update(name = new[t.name]))
point_s = px.scatter(
point,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
point_s["data"][0]["name"] = "Original data"
cf_s = px.scatter(
cf_df,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
cf_s["data"][0]["name"] = "Counterfactual"
clicked_and_cf_s = go.Figure(data=cf_s.data + point_s.data)
clicked_and_cf_s.update_traces(
marker=dict(size=10, symbol="circle", line=dict(width=2))
)
tsne = joblib.load(tsne_path)
tsne = go.Figure(data=tsne.data + clicked_and_cf_s.data)
tsne.update_layout(clickmode="event+select", autosize=True)
# tsne.add_trace(cf_s.data[0])
return tsne
# tsne_cf = TSNE(n_components=2, random_state=0)
# projections = tsne_cf.fit_transform(
# df_merged.drop(["diagnosis"], axis=1).values
# )
# cf_df = pd.DataFrame(
# {
# "0": projections[:num_counterfactuals, 0],
# "1": projections[:num_counterfactuals, 1],
# "diagnosis": cf_df.diagnosis.iloc[:3],
# }
# )
# cf_df = pd.concat([cf_df, clicked_point_df], ignore_index=True, axis=0)
# cf_s = px.scatter(
# cf_df,
# x="0",
# y="1",
# color="diagnosis",
# color_continuous_scale=px.colors.sequential.Rainbow,
# )
# cf_s.update_traces(
# marker=dict(
# size=10,
# symbol="circle",
# )
# )
# tsne = joblib.load("tsne.sav")
# tsne.add_trace(cf_s.data[0])
# pickle.dump(tsne, open("tsne_cfs.sav", "wb"))
# tsne = tsne.to_html()
def get_ecg_entry(X_test, y_test, i, class_label):
# timeseries
# samples subplots
# Create subplots
fig = go.Figure()
y = X_test[y_test == class_label].iloc[i]
index = X_test[y_test == class_label].index[i]
if class_label == 0:
name = "Normal ECG"
else:
name = "Abnormal ECG"
# Adding the ECG trace
fig.add_trace(
go.Scatter(
y=y,
mode="lines",
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
yaxis_title=name,
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(y), max(y)],) # Addspacing between gridlights
return fig, int(index)
def ecg_plot_counterfactuals(i, X_test, y_test, y_pred, X_cf, cf_pred):
fig = go.Figure()
neg = 0
pos = 1
y_test = np.where(y_test == 1, pos, neg)
# print("y_test: ", y_test)
# print("y_pred: ", y_pred)
# print("cf_pred: ", cf_pred)
# Original time series
fig.add_trace(
go.Scatter(
y=X_test.iloc[i],
mode="lines",
name="Original (y_pred = %d, y_actual = %d)" % (y_pred[i], y_test[i]),
line=dict(width=0.5),
)
)
if len(X_cf[i].shape) > 1:
X_cf_flattened = X_cf[i].flatten()
# Counterfactual time series
fig.add_trace(
go.Scatter(
y=X_cf_flattened,
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
else:
fig.add_trace(
go.Scatter(
y=X_cf[i],
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(X_test.iloc[i]), max(X_test.iloc[i])],) # Addspacing between gridlights
# # Mean time series of the counterfactual class
# mean_cf_class = np.mean(X_test.loc[y_test == cf_pred[i]], axis=0)
# fig.add_trace(go.Scatter(
# y=mean_cf_class,
# mode='lines',
# name="Mean of X with y = %d" % cf_pred[i],
# line=dict(width=1, dash='dash')
# ))
fig.update_layout(
xaxis_title="Timepoints", yaxis_title="Values", legend=dict(x=0.01, y=0.99)
)
return fig
def get_info_of_dataframe(df):
# Creating a DataFrame to store the summary
summary_data = {
"Total Rows": [df.shape[0]],
"Total Columns": [df.shape[1]],
"Missing Values (Total)": [df.isnull().sum().sum()],
"Missing Values (Columns)": [df.isnull().any(axis=0).sum()],
"Categorical Columns": [(df.dtypes == "object").sum()],
"Numeric Columns": [df.select_dtypes(include=["number"]).shape[1]],
}
summary_df = pd.DataFrame(summary_data)
# Create a Plotly Table with enhanced styling
fig = go.Figure(
data=[
go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Value</b>"],
fill_color="#4CAF50",
align="left",
font=dict(color="white", size=14),
height=30,
),
cells=dict(
values=[summary_df.columns, summary_df.iloc[0].tolist()],
fill_color=[["#f9f9f9", "white"] * len(summary_df)],
align="left",
font=dict(color="black", size=12),
height=25,
),
)
]
)
fig.update_layout(
title_x=0.5, # Center title
title_y=0.95,
margin=dict(l=20, r=20, t=50, b=20),
width=600,
height=300, # Adjust height based on the content
)
# Convert Plotly figure to HTML
return fig.to_html()
def update_column_list_with_one_hot_columns(df_original, df_encoded, column_list):
updated_columns = []
for column in column_list:
# Check if the column is categorical in the original dataset
if (
pd.api.types.is_categorical_dtype(df_original[column])
or df_original[column].dtype == "object"
):
# The column is categorical, so find the one-hot encoded sub-columns
one_hot_columns = [
col for col in df_encoded.columns if col.startswith(f"{column}_")
]
# Replace the original column name with the one-hot encoded sub-columns
if one_hot_columns:
updated_columns.extend(
one_hot_columns
) # Add the sub-columns to the updated list
else:
# If no one-hot encoded columns are found (for some reason), keep the original column
updated_columns.append(column)
else:
# If the column is not categorical, keep it as is
updated_columns.append(column)
return updated_columns
# Function to extract continuous features
def get_continuous_features(df):
# Filter columns based on dtype and exclude binary columns
continuous_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Exclude binary features (0 and 1 values)
continuous_columns = [
col
for col in continuous_columns
if df[col].nunique() > 2 # Exclude binary features
]
# Return only the continuous features
return list(continuous_columns)
# Function to extract continuous features
def get_categorical_features(df):
# Filter columns based on dtype and exclude binary columns
categorical_columns = df.select_dtypes(include=["object", "category"]).columns
# Return only the continuous features
return list(categorical_columns)
# Function to extract non-continuous features
def get_non_continuous_features(df):
# Select numeric columns
numeric_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Identify binary columns (having only two unique values, like 0/1)
binary_columns = [col for col in numeric_columns if df[col].nunique() == 2]
# Select non-numeric columns
non_numeric_columns = df.select_dtypes(
exclude=["float64", "int64"]
).columns.tolist()
# Combine binary columns and non-numeric columns
non_continuous_columns = binary_columns + non_numeric_columns
# Return only the non-continuous features
return list(non_continuous_columns)
def find_counterfactuals(estimator, explainer, X):
y_pred = estimator.predict(X)
y_desired = np.empty_like(y_pred)
# Store an array of the desired label for each sample.
# We assume a binary classification task and the the desired
# label is the inverse of the predicted label.
a, b = estimator.classes_
y_desired[y_pred == a] = b
y_desired[y_pred == b] = a
# Initialize the explainer, using the medoid approach.
explainer.fit(estimator)
# Explain each sample in X as the desired label in y_desired
X_cf = explainer.explain(X, y_desired)
return X_cf, y_pred, estimator.predict(X_cf)
def is_column_categorical_like(
df, column_name, unique_threshold=10, ratio_threshold=0.05
):
"""
Determines if a numeric column has categorical characteristics.
Parameters:
df (DataFrame): The DataFrame containing the data.
column_name (str): The column name to check.
unique_threshold (int): Maximum number of unique values to consider as categorical.
ratio_threshold (float): Maximum ratio of unique values to total rows to consider as categorical.
Returns:
bool: True if the column is likely categorical, False otherwise.
"""
unique_values = df[column_name].nunique() # Number of unique values
total_values = len(df[column_name]) # Total number of rows
unique_ratio = unique_values / total_values # Ratio of unique values to total rows
# Check if the column is numeric
if pd.api.types.is_numeric_dtype(df[column_name]):
# Consider it categorical if it has fewer than `unique_threshold` unique values
# or if the unique values ratio is below `ratio_threshold`
if unique_values <= unique_threshold or unique_ratio <= ratio_threshold:
return True
return False
# Function to flatten the dictionary
def flatten_dict(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def convert_to_camel_case(s):
# Split the string by hyphen
parts = s.split("-")
# Capitalize each part and join them together
camel_case = "".join(word.capitalize() for word in parts)
return camel_case
# def fetch_line_by_dataset(file_path, dataset, constraint):
def fetch_line_by_dataset(file_path, dataset):
"""
Fetches a line from the file based on the specified dataset name to retrieve basic information about the dataset.
:param file_path: Path to the input file.
:param dataset: The dataset name to search for.
:return: The line matching the dataset and constraint, or None if not found.
"""
with open(file_path, "r") as file:
for line in file:
# Strip leading whitespace
stripped_line = line.strip()
# Skip lines that start with #
if stripped_line.startswith("#"):
continue
# Use regular expressions for exact match of the dataset
dataset_pattern = rf"--dataset\s+{re.escape(dataset)}\b"
if re.search(dataset_pattern, stripped_line):
return stripped_line
return None
def extract_arguments_from_line(line):
"""
Extracts all words that come immediately after '--' arguments in the line.
:param line: A string containing the line to parse.
:return: A list of argument values found in the line.
"""
# Find all arguments and their values
# matches = re.findall(r"(--[\w-]+)((?:\s+[^-][^\s]*)*)", line)
matches = re.findall(r"(--[\w-]+)\s+([^\s]+)", line)
# Extract argument values
arguments = [value.strip() for _, value in matches if value.strip()]
return arguments
def create_tuple_of_models_text_value(available_pre_trained_models):
available_pretrained_models_info = []
for model in available_pre_trained_models:
if "xgb" == model:
available_pretrained_models_text = "XGBoost"
elif "rf" == model:
available_pretrained_models_text = "Random Forest"
elif "lr" == model:
available_pretrained_models_text = "Logistic Regression"
elif "dt" == model:
available_pretrained_models_text = "Decision Tree"
elif "svm" == model:
available_pretrained_models_text = "Support Vector Machine"
elif "glacier" == model:
available_pretrained_models_text = "Glacier 1dCNN"
elif "wildboar_knn" == model:
available_pretrained_models_text = "Wildboar K-Nearest Neighbours"
elif "wildboar_rsf" == model:
available_pretrained_models_text = "Wildboar Random Shapelet Forest"
available_pretrained_models_info.append(
(model, available_pretrained_models_text)
)
return available_pretrained_models_info