1368 lines
45 KiB
Python
Executable File

import pandas as pd
import pickle, os
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from pandas.api.types import is_numeric_dtype
from sklearn.metrics import classification_report
import plotly.express as px
from django.conf import settings
import joblib
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import dice_ml
from dict_and_html import *
import plotly.graph_objects as go
import math
from imblearn.over_sampling import SMOTE
from scipy.stats import median_abs_deviation
from numpy.fft import *
from sklearn.preprocessing import MinMaxScaler
from plotly.subplots import make_subplots
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from .glacier.src.gc_latentcf_search_1dcnn_function import gc_latentcf_search_1dcnn
from .glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
import re
PIPELINE_PATH = os.path.join(settings.BASE_DIR, "base/pipelines/")
def stats(
dataset_path,
dataset_type,
pos=None,
neg=None,
feature1=None,
feature2=None,
label=None,
name=None,
):
print(dataset_type)
if dataset_type == "tabular":
df = pd.read_csv(dataset_path)
binary1 = df[feature1].isin([0, 1]).all()
binary2 = df[feature2].isin([0, 1]).all()
if binary1 == True or binary2 == True:
fig = px.histogram(df, x=feature1, y=feature2, color=label)
elif is_numeric_dtype(df[feature1]) or is_numeric_dtype(df[feature2]):
if not is_numeric_dtype(df[feature1]) or not is_numeric_dtype(df[feature2]):
# feature1 is not numeric but feature2 should be
fig = px.histogram(df, x=feature1, y=feature2, color=label)
else:
# they both are numeric so do scatter
if is_column_categorical_like(
df, feature1
) and not is_column_categorical_like(df, feature2):
# Add jitter to the 'Categorical_Like_Numeric' column
df[feature1] = df[feature1] + np.random.uniform(
-0.1, 0.1, size=df.shape[0]
)
# Create a scatter plot using Plotly
fig = px.scatter(
df,
x=feature1,
y=feature2,
color=df[label].astype(str),
)
elif is_column_categorical_like(
df, feature2
) and not is_column_categorical_like(df, feature1):
print(df)
df[feature2] = df[feature2] + np.random.uniform(
-0.1, 0.1, size=df.shape[0]
)
# Create a scatter plot using Plotly
fig = px.scatter(
df,
x=feature1,
y=feature2,
color=df[label].astype(str),
)
elif is_column_categorical_like(
df, feature2
) and is_column_categorical_like(df, feature1):
df_grouped = (
df.groupby([feature1, feature2, label])
.size()
.reset_index(name="Count")
)
# Create a bubble plot
fig = px.scatter(
df_grouped,
x=feature1,
y=feature2,
size="Count",
color=df_grouped[label].astype(str),
)
else:
# print(
# is_column_categorical_like(df, feature1),
# is_column_categorical_like(df, feature2),
# )
fig = px.scatter(
df, x=feature1, y=feature2, color=df[label].astype(str)
)
else:
# they both are categorical
fig = px.bar(df, x=feature1, y=feature2, color=label, barmode="group")
fig.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
# timeseries
df = pd.read_csv(dataset_path)
# samples subplots
# Create subplots
# column numbers - target_column
# TODO: case for when the dataset has
# id column
if name:
if name == "two-lead-ecg":
negative_label = "Signal 0"
positive_label = "Signal 1"
elif name == "gun-point":
negative_label = "Gun"
positive_label = "No gun"
elif name == "italy-power-demand":
negative_label = "October to March power demand"
positive_label = "April to September power demand"
elif name == "ecg-five-days":
negative_label = "12/11/1990"
positive_label = "17/11/1990"
elif name == "ford-a":
negative_label = "Negative label"
positive_label = "Positive label"
# hard coded need to be dynamic based on
# dataset
negative_label_value = neg
positive_label_value = pos
num_timesteps = df.shape[1] - 1
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
negative_label,
negative_label,
positive_label,
positive_label,
),
)
# suppose univariative
# TODO: multivariative
target_labels = list(df.iloc[:, -1].unique())
positive = target_labels[1]
negative = target_labels[0]
# Add normal ECG trace 1
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[0, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=1,
)
# Add normal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == negative_label_value].iloc[1, :-1],
mode="lines",
name=negative_label,
),
row=1,
col=2,
)
# Add abnormal ECG trace 1
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[0, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=1,
)
# Add abnormal ECG trace 2
fig.add_trace(
go.Scatter(
x=list(range(num_timesteps)),
y=df[df.iloc[:, -1] == positive_label_value].iloc[1, :-1],
mode="lines",
name=positive_label,
),
row=2,
col=2,
)
# Update layout
fig.update_layout(
xaxis_title="Timesteps",
yaxis_title="ECG Value",
showlegend=False,
autosize=True,
)
# confidence plot
df = df.iloc[:, :-1]
df_grouped = df.agg(["mean", "std", "count"]).transpose()
df_grouped["ci"] = 40 * df_grouped["std"] / np.sqrt(df_grouped["count"])
df_grouped["ci_lower"] = df_grouped["mean"] - df_grouped["ci"]
df_grouped["ci_upper"] = df_grouped["mean"] + df_grouped["ci"]
fig1 = go.Figure(
[
go.Scatter(
name="Avg",
x=df_grouped.index,
y=round(df_grouped["mean"], 2),
mode="lines",
line=dict(color="rgb(31, 119, 180)"),
),
go.Scatter(
name="95% CI Upper",
x=df_grouped.index,
y=round(df_grouped["ci_upper"], 2),
mode="lines",
marker=dict(color="#444"),
line=dict(width=0),
showlegend=False,
),
go.Scatter(
name="95% CI Lower",
x=df_grouped.index,
y=round(df_grouped["ci_lower"], 2),
marker=dict(color="#444"),
line=dict(width=0),
mode="lines",
fillcolor="rgba(68, 68, 68, 0.3)",
fill="tonexty",
showlegend=False,
),
]
)
fig1.update_layout(
title="Confidence plot for Two Lead ECG dataset",
xaxis_title="Timestep",
yaxis_title="Avg ECG value",
hovermode="x",
)
fig1.update_yaxes(rangemode="tozero")
return fig.to_html(), fig1.to_html()
# fig = px.line(df.iloc[int(feature1)])
return fig.to_html()
def compare_values(val1, val2):
if isinstance(val1, float) and isinstance(val2, float):
return not math.isclose(float(val1), float(val2))
else:
return val1 != val2
def preprocess(data, value_list, name, dataset_type, path=None, class_label=None):
if dataset_type == "tabular":
if "id" in data.columns:
ids = data["id"]
data = data.drop(["id"], axis=1)
total_nan = data.isna().sum().sum()
if "imp" in value_list:
data = imputations(data, class_label, path)
imputed_data = data
if "onehot" in value_list:
data = onehot(data, path)
if "std" in value_list:
data = scaling(data, class_label, path)
if "id" in data.columns:
data = pd.concat([ids.to_frame(), data], axis=1, ignore_index=False)
if total_nan > 0:
os.remove(name)
imputed_data = pd.concat(
[ids.to_frame(), imputed_data], axis=1, ignore_index=False
)
imputed_data.to_csv(name, index=False)
elif dataset_type == "timeseries":
# timeseries
# save last columns values
data_class_col = data.iloc[:, -1]
# drop last column that contains class_labels
data = data.iloc[:, :-1]
if "imp" in value_list:
data = imputations_ts(data, path)
if "denoise" in value_list:
data = data.apply(denoise, args=(path,), axis=0)
if "std" in value_list:
data = scaling_ts(data, path)
data = pd.concat([data, data_class_col], axis=1)
# os.remove(name)
# data.to_csv(name, index=False)
return data
###--------------------------###
### TIMESERIES PREPROCESSING ###
def scaling_ts(data, path):
# Normalize the data using Min-Max scaling
scaler = MinMaxScaler()
data[data.columns] = scaler.fit_transform(data)
pickle.dump(scaler, open(path + "/min_max_scaler.sav", "wb"))
return data
def denoise(series, path):
# Apply FFT
fft_vals = fft(series)
fft_freqs = np.fft.fftfreq(len(fft_vals))
# Filter frequencies
fft_vals[np.abs(fft_freqs) > 0.1] = 0
# Inverse FFT to reconstruct the signal
denoised_series = ifft(fft_vals).real
return pd.Series(denoised_series, index=series.index)
def outlier_detection(series, path):
median = series.median()
mad = median_abs_deviation(series)
return np.abs(series - median) / mad > 3
def imputations_ts(data, path):
data[data.columns] = data[data.columns].fillna(data.mean())
return data
### TIMESERIES PREPROCESSING ###
###--------------------------###
###--------------------------###
### TABULAR PREPROCESSING ###
def onehot(data, path):
encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
categorical_columns = data.select_dtypes(include=["object"]).columns.tolist()
# Apply one-hot encoding to the categorical columns
one_hot_encoded = encoder.fit_transform(data[categorical_columns]).astype(float)
one_hot_df = pd.DataFrame(
one_hot_encoded, columns=encoder.get_feature_names_out(categorical_columns)
)
pickle.dump(encoder, open(path + "/one_hot.sav", "wb"))
# Concatenate the one-hot encoded dataframe with the original dataframe
df_encoded = pd.concat([data, one_hot_df], axis=1)
# Drop the original categorical columns
data = df_encoded.drop(categorical_columns, axis=1)
return data
def imputations(data, class_label, path):
imp = SimpleImputer(missing_values=np.nan, strategy="mean")
y = data[class_label]
data = data.drop([class_label], axis=1)
numeric_cols = data.select_dtypes(include=["float64", "int64"]).columns
print("Numeric columns ", numeric_cols)
data[numeric_cols] = imp.fit_transform(data[numeric_cols])
# Convert back to DataFrame and restore original data types
data[numeric_cols] = data[numeric_cols].astype(float)
pickle.dump(imp, open(path + "/imp.sav", "wb"))
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
return data
def scaling(data, class_label, path):
scaler = StandardScaler()
# should not scale binary classes
# define standard scaler
y = data[class_label]
# if class column is numeric do not
# apply preprocessing
data = data.drop([class_label], axis=1)
# transform data
cols = data.select_dtypes(np.number).columns
# keep non-binary columns
nonbinary_columns = [
col for col in cols if not data[col].dropna().isin([0, 1]).all()
]
data[nonbinary_columns] = scaler.fit_transform(data[nonbinary_columns])
pickle.dump(scaler, open(path + "/standard_scaler.sav", "wb"))
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
return data
### TABULAR PREPROCESSING ###
###--------------------------###
def decode_cf(df, row, class_label, path, preprocessing_list):
cf_row = row.copy()
# get actual numerical columns
df_numerical = df.select_dtypes(exclude=["object"]).columns.tolist()
nonbinary_numeric_columns = [
col for col in df_numerical if not df[col].dropna().isin([0, 1]).all()
]
# get actual categorical columns
df_categorical = (
df.drop([class_label], axis=1)
.select_dtypes(include=["object"])
.columns.tolist()
)
if "onehot" in preprocessing_list:
ohe = joblib.load(path + "/one_hot.sav")
# if there were categorical columns in the dataframe
if ohe.get_feature_names_out().size > 0:
one_hot_decoded = ohe.inverse_transform(cf_row[ohe.get_feature_names_out()])
cf_categorical_columns = ohe.get_feature_names_out()
# Drop the original categorical columns
cf_row = cf_row.drop(cf_categorical_columns, axis=1)
cf_row[df_categorical] = one_hot_decoded
if "std" in preprocessing_list:
scaler = joblib.load(path + "/standard_scaler.sav")
print(nonbinary_numeric_columns)
cf_row[nonbinary_numeric_columns] = scaler.inverse_transform(
cf_row[nonbinary_numeric_columns]
)
cf_row[class_label] = cf_row[class_label]
le = joblib.load(path + "/label_encoder.sav")
cf_row[class_label] = le.inverse_transform(cf_row[class_label])
return cf_row
def training(
data,
model,
test_size,
label,
dataset_type,
df_name,
model_path=None,
autoencoder="No",
experiment_arguments=None,
):
X = data
if dataset_type == "tabular":
if "id" in data.columns:
X = data.drop("id", axis=1)
y = X[label]
X = X.drop(label, axis=1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, stratify=y.values, random_state=42
)
if df_name == "stroke":
# needs Oversampling
## TODO if class_label is multi class SMOTE needs to have sampling strategy Dict
## TODO check if df needs oversampling
oversample = SMOTE(sampling_strategy=0.4, random_state=42)
X_sm, y_sm = oversample.fit_resample(
X_train,
y_train,
)
X_train, X_test, y_train, y_test = train_test_split(
X_sm, y_sm, test_size=test_size, stratify=y_sm.values, random_state=42
)
if "lr" == model:
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0).fit(X_train, y_train)
y_pred = clf.predict(X_test)
filename = "lr.sav"
importance = clf.coef_[0]
model = clf
if "xgb" == model:
from xgboost import XGBClassifier
# TODO enable_categorical whould be dynamically added
# when there are categorical variables in the dataset
xgb = XGBClassifier(
n_estimators=200, # Number of trees (boosting rounds)
learning_rate=0.1, # Step size shrinkage (eta)
max_depth=4, # Maximum tree depth
min_child_weight=1, # Minimum sum of weights in a child
subsample=0.8, # Fraction of samples used per tree
colsample_bytree=0.8, # Fraction of features used per tree
gamma=0, # Minimum loss reduction to make a split
reg_lambda=1, # L2 regularization term (ridge)
reg_alpha=0, # L1 regularization term (lasso)
objective="binary:logistic", # Binary classification objective
use_label_encoder=False, # Avoids unnecessary warnings for older versions
eval_metric="logloss", # Logarithmic loss evaluation metric
).fit(X_train, y_train)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == model:
from sklearn.tree import DecisionTreeClassifier
dt = DecisionTreeClassifier(max_depth=30, random_state=42)
dt.fit(X_train, y_train)
y_pred = dt.predict(X_test)
filename = "dt.sav"
importance = dt.feature_importances_
model = dt
if "svm" == model:
from sklearn import svm
svc = svm.SVC(kernel="linear", probability=True)
svc.fit(X_train, y_train)
y_pred = svc.predict(X_test)
filename = "svm.sav"
importance = svc.coef_[0]
model = svc
if "rf" == model:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier()
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
filename = "rf.sav"
importance = rf.feature_importances_
model = rf
class_report = classification_report(y_test, y_pred.flatten(), output_dict=True)
class_report = pd.DataFrame(class_report)
feature_importance = px.bar(x=importance, y=X_train.columns)
if model_path:
pickle.dump(model, open(model_path + f"/{filename}", "wb"))
feature_importance_dict = dict(zip(X_train.columns, importance))
return feature_importance, class_report, feature_importance_dict
else:
# TODO: add 1dcnn train
if model == "glacier":
# Split the lr-list string and convert each value to float
# experiment_arguments[8] = experiment_arguments[8].rstrip(';')
# lr_list = [float(x) for x in experiment_arguments[8].split()]
gc_latentcf_search_1dcnn(
data,
int(experiment_arguments[1]),
int(experiment_arguments[2]),
model_path + f"/{experiment_arguments[3]}",
model_path,
autoencoder,
)
elif model == "wildboar_knn" or model == "wildboar_rsf":
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=1
)
# n_samples, n_timestep = X_train.shape
# y_labels, counts = np.unique(y_train, return_counts=True)
# print(
# f"""
# The dataset contains {n_samples} samples with {n_timestep} time steps each.
# Of the samples, {counts[0]} is labeled as {y_labels[0]} and {counts[1]} labeled
# as {y_labels[1]}. Here, we plot the time series.
# """
# )
from wildboar.utils.plot import plot_time_domain
if model == "wildboar_knn":
from wildboar.distance import KNeighborsClassifier
from wildboar.explain.counterfactual import KNeighborsCounterfactual
filename = "wildboar_knn.sav"
classifier = KNeighborsClassifier(
n_neighbors=5, metric="dtw", metric_params={"r": 0.5}
)
explainer = KNeighborsCounterfactual(random_state=1, method="auto")
if model == "wildboar_rsf":
from wildboar.ensemble import ShapeletForestClassifier
from wildboar.explain.counterfactual import ShapeletForestCounterfactual
filename = "wildboar_rsf.sav"
classifier = ShapeletForestClassifier(
n_estimators=100,
metric="euclidean",
max_depth=5,
random_state=1,
)
explainer = ShapeletForestCounterfactual(random_state=1)
classifier.fit(X_train, y_train)
# Assuming you have X_test and y_test as test data
y_pred = classifier.predict(X_test)
# Generate the classification report
class_report = classification_report(y_test, y_pred, output_dict=True)
# Convert the classification report to a pandas DataFrame
class_report = pd.DataFrame(class_report).transpose()
X_cf, y_pred, cf_pred = find_counterfactuals(classifier, explainer, X_test)
# save x_test, y_test for future use
if model_path:
x_test_df = pd.DataFrame(X_test)
x_test_df.columns = data.iloc[:, :-1].columns
x_test_df.to_csv(model_path + "/X_test.csv", index=None)
np.save(model_path + "/y_test.npy", y_test)
pickle.dump(classifier, open(model_path + f"/{filename}", "wb"))
np.save(model_path + "/X_cf.npy", X_cf)
np.save(model_path + "/y_pred.npy", y_pred)
np.save(model_path + "/cf_pred.npy", cf_pred)
return class_report
def testing(name, type):
data = pd.read_csv(name)
y_test = data["diagnosis"]
X_test = data.drop("diagnosis", axis=1)
if "lr" == type:
filename = "lr.sav"
clf = joblib.load(filename)
y_pred = clf.predict(X_test)
importance = clf.coef_[0]
model = clf
if "xgb" == type:
filename = "xgb.sav"
xgb = joblib.load(filename)
y_pred = xgb.predict(X_test)
filename = "xgb.sav"
importance = xgb.feature_importances_
model = xgb
if "dt" == type:
filename = "dt.sav"
dt = joblib.load(filename)
y_pred = dt.predict(X_test)
importance = dt.feature_importances_
model = dt
if "svm" == type:
filename = "svm.sav"
svc = joblib.load(filename)
y_pred = svc.predict(X_test)
importance = svc.coef_[0]
model = svc
if "rf" == type:
filename = "rf.sav"
rf = joblib.load(filename)
y_pred = rf.predict(X_test)
importance = rf.feature_importances_
model = rf
clas_report = classification_report(y_test, y_pred, output_dict=True)
clas_report = pd.DataFrame(clas_report).transpose()
clas_report = clas_report.sort_values(by=["f1-score"], ascending=False)
fig2 = px.bar(x=importance, y=X_test.columns)
pickle.dump(model, open(filename, "wb"))
con = {
"fig2": fig2.to_html(),
"clas_report": clas_report,
}
return con
# compute counterfactuals
def counterfactuals(
query,
model,
df,
class_label,
continuous_features,
num_counterfactuals=5,
features_to_vary=[],
):
if "id" in df.columns:
df = df.drop("id", axis=1)
if "id" in query.columns:
query = query.drop("id", axis=1)
query = query.drop(class_label, axis=1)
# data = df.drop(class_label, axis=1)
# continuous_features = df.drop(class_label, axis=1).columns.tolist()
# continuous_features = (
# df.drop(class_label, axis=1).select_dtypes(exclude=["object"]).columns.tolist()
# )
print(df.dtypes)
d = dice_ml.Data(
dataframe=df,
continuous_features=continuous_features,
outcome_name=class_label,
)
m = dice_ml.Model(model=model, backend="sklearn")
exp = dice_ml.Dice(d, m)
if len(features_to_vary) > 0:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
features_to_vary=features_to_vary,
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
random_seed=42,
)
except Exception as e:
print(e)
dice_exp = None
else:
try:
dice_exp = exp.generate_counterfactuals(
query,
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
desired_class="opposite", # We want to convert the quality to the opposite one.
proximity_weight=0.5, # Control proximity
diversity_weight=1.0, # Control diversity
sparsity_weight=0.5, # Enforce minimal feature changes
random_seed=42,
)
except Exception as e:
print(e)
dice_exp = None
if dice_exp:
return dice_exp._cf_examples_list
return dice_exp
def get_dataframe(path):
df = pd.read_csv(path)
return df
def generatePCA(preprocess_df):
pca = PCA()
pca.fit(preprocess_df)
exp_var_cumul = np.cumsum(pca.explained_variance_ratio_)
pca = px.area(
x=range(1, exp_var_cumul.shape[0] + 1),
y=exp_var_cumul,
labels={"x": "# Components", "y": "Explained Variance"},
)
pca.update_layout(
autosize=True,
)
return pca
def generateTSNE(preprocess_df, dataset_type, class_label=None):
# tSNE
tsne = TSNE(n_components=2, random_state=39)
if dataset_type == "tabular":
projections = tsne.fit_transform(preprocess_df.drop(class_label, axis=1).values)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
class_label: preprocess_df[class_label].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color=class_label,
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
elif dataset_type == "timeseries":
preprocess_df_drop_class = preprocess_df.iloc[:, :-1]
projections = tsne.fit_transform(preprocess_df_drop_class)
tsne_df = pd.DataFrame(
{
"0": projections[:, 0],
"1": projections[:, 1],
"class": preprocess_df.iloc[:, -1].astype(str),
}
)
# render_mode="svg" will prevent the scatter from getting to GL mode
# for sufficiently large input. It was observed that for datasets
# larger than 1000 entries, the scatter html object that would be
# generated would lack the <g class="points" ... > </g> element containing
# all the points of the scatter. Using that we could connect the click
# of a point with the actual point in the dataset. Thus it was vital
# that there exists such an element and also is accessible.
# By default, scatter disables it to free space in the client side
# and by using render_mode="svg" we avoid that behaviour.
# https://github.com/plotly/plotly_express/issues/145
tsne = px.scatter(
tsne_df,
x="0",
y="1",
color="class",
render_mode="svg",
)
tsne.update_layout(clickmode="event+select", autosize=True)
return tsne, projections
def generateAugmentedTSNE(
df, cf_df, num_counterfactuals, point, tsne_path, class_label
):
"""
Given a tsne graph, add the traces of the computed counterfactuals for a given point and return the new graph
Parameters
----------
df: dataframe used to compute tsne
cf_df: counterfactuals dataframe
point: original point of for which counterfatuals were computed
tsne_path: path to the original tsne plot
Returns
-------
The tsne graph updated with new counterfactuals points. Cunterfactual points and the point itself are resized
"""
# make the tsne (the same tsne but with extra points
# the counterfactuals points descriped in
# counterfactuals.csv)
tsne_cf = TSNE(n_components=2, random_state=0)
# merge counterfactuals csv with tsne_data
df_merged = pd.concat([cf_df, df], ignore_index=True, axis=0)
projections = tsne_cf.fit_transform(df_merged.drop(class_label, axis=1).values)
# cf_df containes the projection values and the class_label
# value of the counterfactual points. Porjections is np array
# containing pairs of x and y values for each tsne graph point
#
cf_df = pd.DataFrame(
{
"0": projections[:num_counterfactuals, 0],
"1": projections[:num_counterfactuals, 1],
class_label: cf_df[class_label].iloc[:num_counterfactuals].astype(str),
}
)
# cf_df = pd.concat([cf_df, point], ignore_index=True, axis=0)
# new = {'0':'Front hello', '1': 'hi'}
# cf_s.for_each_trace(lambda t: t.update(name = new[t.name]))
point_s = px.scatter(
point,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
point_s["data"][0]["name"] = "Original data"
cf_s = px.scatter(
cf_df,
x="0",
y="1",
color=class_label,
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
render_mode="svg",
)
cf_s["data"][0]["name"] = "Counterfactual"
clicked_and_cf_s = go.Figure(data=cf_s.data + point_s.data)
clicked_and_cf_s.update_traces(
marker=dict(size=10, symbol="circle", line=dict(width=2))
)
tsne = joblib.load(tsne_path)
tsne = go.Figure(data=tsne.data + clicked_and_cf_s.data)
tsne.update_layout(clickmode="event+select", autosize=True)
# tsne.add_trace(cf_s.data[0])
return tsne
# tsne_cf = TSNE(n_components=2, random_state=0)
# projections = tsne_cf.fit_transform(
# df_merged.drop(["diagnosis"], axis=1).values
# )
# cf_df = pd.DataFrame(
# {
# "0": projections[:num_counterfactuals, 0],
# "1": projections[:num_counterfactuals, 1],
# "diagnosis": cf_df.diagnosis.iloc[:3],
# }
# )
# cf_df = pd.concat([cf_df, clicked_point_df], ignore_index=True, axis=0)
# cf_s = px.scatter(
# cf_df,
# x="0",
# y="1",
# color="diagnosis",
# color_continuous_scale=px.colors.sequential.Rainbow,
# )
# cf_s.update_traces(
# marker=dict(
# size=10,
# symbol="circle",
# )
# )
# tsne = joblib.load("tsne.sav")
# tsne.add_trace(cf_s.data[0])
# pickle.dump(tsne, open("tsne_cfs.sav", "wb"))
# tsne = tsne.to_html()
def get_ecg_entry(X_test, y_test, i, class_label):
# timeseries
# samples subplots
# Create subplots
fig = go.Figure()
y = X_test[y_test == class_label].iloc[i]
index = X_test[y_test == class_label].index[i]
if class_label == 0:
name = "Normal ECG"
else:
name = "Abnormal ECG"
# Adding the ECG trace
fig.add_trace(
go.Scatter(
y=y,
mode="lines",
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
yaxis_title=name,
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(y), max(y)],) # Addspacing between gridlights
return fig, int(index)
def ecg_plot_counterfactuals(i, X_test, y_test, y_pred, X_cf, cf_pred):
fig = go.Figure()
neg = 0
pos = 1
y_test = np.where(y_test == 1, pos, neg)
# print("y_test: ", y_test)
# print("y_pred: ", y_pred)
# print("cf_pred: ", cf_pred)
# Original time series
fig.add_trace(
go.Scatter(
y=X_test.iloc[i],
mode="lines",
name="Original (y_pred = %d, y_actual = %d)" % (y_pred[i], y_test[i]),
line=dict(width=0.5),
)
)
if len(X_cf[i].shape) > 1:
X_cf_flattened = X_cf[i].flatten()
# Counterfactual time series
fig.add_trace(
go.Scatter(
y=X_cf_flattened,
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
else:
fig.add_trace(
go.Scatter(
y=X_cf[i],
mode="lines",
name="Counterfactual (y = %d)" % cf_pred[i],
line=dict(width=1),
)
)
# Updating the layout to reduce margins and make the plot more compact
fig.update_layout(
xaxis_title="Timestep",
hovermode="x",
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
)
# Adjust y-axis to start from zero, customize the grid, and add more space
fig.update_yaxes(
rangemode="tozero",
showgrid=True,
gridwidth=1, # Makes the gridlines slightly more pronounced
tickvals=[min(X_test.iloc[i]), max(X_test.iloc[i])],) # Addspacing between gridlights
# # Mean time series of the counterfactual class
# mean_cf_class = np.mean(X_test.loc[y_test == cf_pred[i]], axis=0)
# fig.add_trace(go.Scatter(
# y=mean_cf_class,
# mode='lines',
# name="Mean of X with y = %d" % cf_pred[i],
# line=dict(width=1, dash='dash')
# ))
fig.update_layout(
xaxis_title="Timepoints", yaxis_title="Values", legend=dict(x=0.01, y=0.99)
)
return fig
def get_info_of_dataframe(df):
# Creating a DataFrame to store the summary
summary_data = {
"Total Rows": [df.shape[0]],
"Total Columns": [df.shape[1]],
"Missing Values (Total)": [df.isnull().sum().sum()],
"Missing Values (Columns)": [df.isnull().any(axis=0).sum()],
"Categorical Columns": [(df.dtypes == "object").sum()],
"Numeric Columns": [df.select_dtypes(include=["number"]).shape[1]],
}
summary_df = pd.DataFrame(summary_data)
# Create a Plotly Table with enhanced styling
fig = go.Figure(
data=[
go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Value</b>"],
fill_color="#4CAF50",
align="left",
font=dict(color="white", size=14),
height=30,
),
cells=dict(
values=[summary_df.columns, summary_df.iloc[0].tolist()],
fill_color=[["#f9f9f9", "white"] * len(summary_df)],
align="left",
font=dict(color="black", size=12),
height=25,
),
)
]
)
fig.update_layout(
title_x=0.5, # Center title
title_y=0.95,
margin=dict(l=20, r=20, t=50, b=20),
width=600,
height=300, # Adjust height based on the content
)
# Convert Plotly figure to HTML
return fig.to_html()
def update_column_list_with_one_hot_columns(df_original, df_encoded, column_list):
updated_columns = []
for column in column_list:
# Check if the column is categorical in the original dataset
if (
pd.api.types.is_categorical_dtype(df_original[column])
or df_original[column].dtype == "object"
):
# The column is categorical, so find the one-hot encoded sub-columns
one_hot_columns = [
col for col in df_encoded.columns if col.startswith(f"{column}_")
]
# Replace the original column name with the one-hot encoded sub-columns
if one_hot_columns:
updated_columns.extend(
one_hot_columns
) # Add the sub-columns to the updated list
else:
# If no one-hot encoded columns are found (for some reason), keep the original column
updated_columns.append(column)
else:
# If the column is not categorical, keep it as is
updated_columns.append(column)
return updated_columns
# Function to extract continuous features
def get_continuous_features(df):
# Filter columns based on dtype and exclude binary columns
continuous_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Exclude binary features (0 and 1 values)
continuous_columns = [
col
for col in continuous_columns
if df[col].nunique() > 2 # Exclude binary features
]
# Return only the continuous features
return list(continuous_columns)
# Function to extract continuous features
def get_categorical_features(df):
# Filter columns based on dtype and exclude binary columns
categorical_columns = df.select_dtypes(include=["object", "category"]).columns
# Return only the continuous features
return list(categorical_columns)
# Function to extract non-continuous features
def get_non_continuous_features(df):
# Select numeric columns
numeric_columns = df.select_dtypes(include=["float64", "int64"]).columns
# Identify binary columns (having only two unique values, like 0/1)
binary_columns = [col for col in numeric_columns if df[col].nunique() == 2]
# Select non-numeric columns
non_numeric_columns = df.select_dtypes(
exclude=["float64", "int64"]
).columns.tolist()
# Combine binary columns and non-numeric columns
non_continuous_columns = binary_columns + non_numeric_columns
# Return only the non-continuous features
return list(non_continuous_columns)
def find_counterfactuals(estimator, explainer, X):
y_pred = estimator.predict(X)
y_desired = np.empty_like(y_pred)
# Store an array of the desired label for each sample.
# We assume a binary classification task and the the desired
# label is the inverse of the predicted label.
a, b = estimator.classes_
y_desired[y_pred == a] = b
y_desired[y_pred == b] = a
# Initialize the explainer, using the medoid approach.
explainer.fit(estimator)
# Explain each sample in X as the desired label in y_desired
X_cf = explainer.explain(X, y_desired)
return X_cf, y_pred, estimator.predict(X_cf)
def is_column_categorical_like(
df, column_name, unique_threshold=10, ratio_threshold=0.05
):
"""
Determines if a numeric column has categorical characteristics.
Parameters:
df (DataFrame): The DataFrame containing the data.
column_name (str): The column name to check.
unique_threshold (int): Maximum number of unique values to consider as categorical.
ratio_threshold (float): Maximum ratio of unique values to total rows to consider as categorical.
Returns:
bool: True if the column is likely categorical, False otherwise.
"""
unique_values = df[column_name].nunique() # Number of unique values
total_values = len(df[column_name]) # Total number of rows
unique_ratio = unique_values / total_values # Ratio of unique values to total rows
# Check if the column is numeric
if pd.api.types.is_numeric_dtype(df[column_name]):
# Consider it categorical if it has fewer than `unique_threshold` unique values
# or if the unique values ratio is below `ratio_threshold`
if unique_values <= unique_threshold or unique_ratio <= ratio_threshold:
return True
return False
# Function to flatten the dictionary
def flatten_dict(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def convert_to_camel_case(s):
# Split the string by hyphen
parts = s.split("-")
# Capitalize each part and join them together
camel_case = "".join(word.capitalize() for word in parts)
return camel_case
# def fetch_line_by_dataset(file_path, dataset, constraint):
def fetch_line_by_dataset(file_path, dataset):
"""
Fetches a line from the file based on the specified dataset name to retrieve basic information about the dataset.
:param file_path: Path to the input file.
:param dataset: The dataset name to search for.
:return: The line matching the dataset and constraint, or None if not found.
"""
with open(file_path, "r") as file:
for line in file:
# Strip leading whitespace
stripped_line = line.strip()
# Skip lines that start with #
if stripped_line.startswith("#"):
continue
# Use regular expressions for exact match of the dataset
dataset_pattern = rf"--dataset\s+{re.escape(dataset)}\b"
if re.search(dataset_pattern, stripped_line):
return stripped_line
return None
def extract_arguments_from_line(line):
"""
Extracts all words that come immediately after '--' arguments in the line.
:param line: A string containing the line to parse.
:return: A list of argument values found in the line.
"""
# Find all arguments and their values
# matches = re.findall(r"(--[\w-]+)((?:\s+[^-][^\s]*)*)", line)
matches = re.findall(r"(--[\w-]+)\s+([^\s]+)", line)
# Extract argument values
arguments = [value.strip() for _, value in matches if value.strip()]
return arguments
def create_tuple_of_models_text_value(available_pre_trained_models):
available_pretrained_models_info = []
for model in available_pre_trained_models:
if "xgb" == model:
available_pretrained_models_text = "XGBoost"
elif "rf" == model:
available_pretrained_models_text = "Random Forest"
elif "lr" == model:
available_pretrained_models_text = "Logistic Regression"
elif "dt" == model:
available_pretrained_models_text = "Decision Tree"
elif "svm" == model:
available_pretrained_models_text = "Support Vector Machine"
elif "glacier" == model:
available_pretrained_models_text = "Glacier 1dCNN"
elif "wildboar_knn" == model:
available_pretrained_models_text = "Wildboar K-Nearest Neighbours"
elif "wildboar_rsf" == model:
available_pretrained_models_text = "Wildboar Random Shapelet Forest"
available_pretrained_models_info.append(
(model, available_pretrained_models_text)
)
return available_pretrained_models_info