1371 lines
45 KiB
Python
Executable File
1371 lines
45 KiB
Python
Executable File
import dice_ml.data_interfaces
|
|
import dice_ml.data_interfaces.private_data_interface
|
|
import pandas as pd
|
|
import pickle, os
|
|
from sklearn.impute import SimpleImputer
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.preprocessing import OneHotEncoder
|
|
import numpy as np
|
|
from pandas.api.types import is_numeric_dtype
|
|
from sklearn.metrics import classification_report
|
|
import plotly.express as px
|
|
from django.conf import settings
|
|
import joblib
|
|
from sklearn.decomposition import PCA
|
|
from sklearn.manifold import TSNE
|
|
import dice_ml
|
|
from dict_and_html import *
|
|
import plotly.graph_objects as go
|
|
import math
|
|
from imblearn.over_sampling import SMOTE
|
|
from scipy.stats import median_abs_deviation
|
|
from numpy.fft import *
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from plotly.subplots import make_subplots
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.metrics import classification_report
|
|
from .glacier.src.gc_latentcf_search_1dcnn_function import gc_latentcf_search_1dcnn
|
|
from .glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
|
|
import re
|
|
import json
|
|
|
|
PIPELINE_PATH = os.path.join(settings.BASE_DIR, "base/pipelines/")
|
|
|
|
def stats(
|
|
dataset_path,
|
|
dataset_type,
|
|
pos=None,
|
|
neg=None,
|
|
feature1=None,
|
|
feature2=None,
|
|
label=None,
|
|
name=None,
|
|
):
|
|
print(dataset_type)
|
|
if dataset_type == "tabular":
|
|
df = pd.read_csv(dataset_path)
|
|
|
|
binary1 = df[feature1].isin([0, 1]).all()
|
|
binary2 = df[feature2].isin([0, 1]).all()
|
|
|
|
if binary1 == True or binary2 == True:
|
|
fig = px.histogram(df, x=feature1, y=feature2, color=label)
|
|
elif is_numeric_dtype(df[feature1]) or is_numeric_dtype(df[feature2]):
|
|
if not is_numeric_dtype(df[feature1]) or not is_numeric_dtype(df[feature2]):
|
|
# feature1 is not numeric but feature2 should be
|
|
fig = px.histogram(df, x=feature1, y=feature2, color=label)
|
|
else:
|
|
# they both are numeric so do scatter
|
|
if is_column_categorical_like(
|
|
df, feature1
|
|
) and not is_column_categorical_like(df, feature2):
|
|
# Add jitter to the 'Categorical_Like_Numeric' column
|
|
df[feature1] = df[feature1] + np.random.uniform(
|
|
-0.1, 0.1, size=df.shape[0]
|
|
)
|
|
|
|
# Create a scatter plot using Plotly
|
|
fig = px.scatter(
|
|
df,
|
|
x=feature1,
|
|
y=feature2,
|
|
color=df[label].astype(str),
|
|
)
|
|
elif is_column_categorical_like(
|
|
df, feature2
|
|
) and not is_column_categorical_like(df, feature1):
|
|
print(df)
|
|
df[feature2] = df[feature2] + np.random.uniform(
|
|
-0.1, 0.1, size=df.shape[0]
|
|
)
|
|
|
|
# Create a scatter plot using Plotly
|
|
fig = px.scatter(
|
|
df,
|
|
x=feature1,
|
|
y=feature2,
|
|
color=df[label].astype(str),
|
|
)
|
|
elif is_column_categorical_like(
|
|
df, feature2
|
|
) and is_column_categorical_like(df, feature1):
|
|
df_grouped = (
|
|
df.groupby([feature1, feature2, label])
|
|
.size()
|
|
.reset_index(name="Count")
|
|
)
|
|
|
|
# Create a bubble plot
|
|
fig = px.scatter(
|
|
df_grouped,
|
|
x=feature1,
|
|
y=feature2,
|
|
size="Count",
|
|
color=df_grouped[label].astype(str),
|
|
)
|
|
|
|
else:
|
|
# print(
|
|
# is_column_categorical_like(df, feature1),
|
|
# is_column_categorical_like(df, feature2),
|
|
# )
|
|
fig = px.scatter(
|
|
df, x=feature1, y=feature2, color=df[label].astype(str)
|
|
)
|
|
else:
|
|
# they both are categorical
|
|
fig = px.bar(df, x=feature1, y=feature2, color=label, barmode="group")
|
|
|
|
fig.update_layout(clickmode="event+select", autosize=True)
|
|
|
|
elif dataset_type == "timeseries":
|
|
# timeseries
|
|
df = pd.read_csv(dataset_path)
|
|
|
|
# samples subplots
|
|
# Create subplots
|
|
# column numbers - target_column
|
|
# TODO: case for when the dataset has
|
|
# id column
|
|
if name:
|
|
if name == "two-lead-ecg":
|
|
negative_label = "Signal 0"
|
|
positive_label = "Signal 1"
|
|
|
|
elif name == "gun-point":
|
|
negative_label = "Gun"
|
|
positive_label = "No gun"
|
|
|
|
elif name == "italy-power-demand":
|
|
negative_label = "October to March power demand"
|
|
positive_label = "April to September power demand"
|
|
|
|
elif name == "ecg-five-days":
|
|
negative_label = "12/11/1990"
|
|
positive_label = "17/11/1990"
|
|
|
|
elif name == "ford-a":
|
|
negative_label = "Negative label"
|
|
positive_label = "Positive label"
|
|
|
|
# hard coded need to be dynamic based on
|
|
# dataset
|
|
negative_label_value = neg
|
|
positive_label_value = pos
|
|
|
|
num_timesteps = df.shape[1] - 1
|
|
fig = make_subplots(
|
|
rows=2,
|
|
cols=2,
|
|
subplot_titles=(
|
|
negative_label,
|
|
negative_label,
|
|
positive_label,
|
|
positive_label,
|
|
),
|
|
)
|
|
|
|
# suppose univariative
|
|
# TODO: multivariative
|
|
target_labels = list(df.iloc[:, -1].unique())
|
|
positive = target_labels[1]
|
|
negative = target_labels[0]
|
|
|
|
# Add normal ECG trace 1
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=list(range(num_timesteps)),
|
|
y=df[df.iloc[:, -1] == negative_label_value].iloc[0, :-1],
|
|
mode="lines",
|
|
name=negative_label,
|
|
),
|
|
row=1,
|
|
col=1,
|
|
)
|
|
|
|
# Add normal ECG trace 2
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=list(range(num_timesteps)),
|
|
y=df[df.iloc[:, -1] == negative_label_value].iloc[1, :-1],
|
|
mode="lines",
|
|
name=negative_label,
|
|
),
|
|
row=1,
|
|
col=2,
|
|
)
|
|
|
|
# Add abnormal ECG trace 1
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=list(range(num_timesteps)),
|
|
y=df[df.iloc[:, -1] == positive_label_value].iloc[0, :-1],
|
|
mode="lines",
|
|
name=positive_label,
|
|
),
|
|
row=2,
|
|
col=1,
|
|
)
|
|
|
|
# Add abnormal ECG trace 2
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=list(range(num_timesteps)),
|
|
y=df[df.iloc[:, -1] == positive_label_value].iloc[1, :-1],
|
|
mode="lines",
|
|
name=positive_label,
|
|
),
|
|
row=2,
|
|
col=2,
|
|
)
|
|
|
|
# Update layout
|
|
fig.update_layout(
|
|
xaxis_title="Timesteps",
|
|
yaxis_title="ECG Value",
|
|
showlegend=False,
|
|
autosize=True,
|
|
)
|
|
|
|
# confidence plot
|
|
df = df.iloc[:, :-1]
|
|
|
|
df_grouped = df.agg(["mean", "std", "count"]).transpose()
|
|
|
|
df_grouped["ci"] = 40 * df_grouped["std"] / np.sqrt(df_grouped["count"])
|
|
df_grouped["ci_lower"] = df_grouped["mean"] - df_grouped["ci"]
|
|
df_grouped["ci_upper"] = df_grouped["mean"] + df_grouped["ci"]
|
|
|
|
fig1 = go.Figure(
|
|
[
|
|
go.Scatter(
|
|
name="Avg",
|
|
x=df_grouped.index,
|
|
y=round(df_grouped["mean"], 2),
|
|
mode="lines",
|
|
line=dict(color="rgb(31, 119, 180)"),
|
|
),
|
|
go.Scatter(
|
|
name="95% CI Upper",
|
|
x=df_grouped.index,
|
|
y=round(df_grouped["ci_upper"], 2),
|
|
mode="lines",
|
|
marker=dict(color="#444"),
|
|
line=dict(width=0),
|
|
showlegend=False,
|
|
),
|
|
go.Scatter(
|
|
name="95% CI Lower",
|
|
x=df_grouped.index,
|
|
y=round(df_grouped["ci_lower"], 2),
|
|
marker=dict(color="#444"),
|
|
line=dict(width=0),
|
|
mode="lines",
|
|
fillcolor="rgba(68, 68, 68, 0.3)",
|
|
fill="tonexty",
|
|
showlegend=False,
|
|
),
|
|
]
|
|
)
|
|
fig1.update_layout(
|
|
title="Confidence plot for Two Lead ECG dataset",
|
|
xaxis_title="Timestep",
|
|
yaxis_title="Avg ECG value",
|
|
hovermode="x",
|
|
)
|
|
fig1.update_yaxes(rangemode="tozero")
|
|
|
|
return fig.to_html(), fig1.to_html()
|
|
# fig = px.line(df.iloc[int(feature1)])
|
|
|
|
return fig.to_html()
|
|
|
|
|
|
def compare_values(val1, val2):
|
|
if isinstance(val1, float) and isinstance(val2, float):
|
|
return not math.isclose(float(val1), float(val2))
|
|
else:
|
|
return val1 != val2
|
|
|
|
|
|
def preprocess(data, value_list, name, dataset_type, path=None, class_label=None):
|
|
|
|
if dataset_type == "tabular":
|
|
|
|
if "id" in data.columns:
|
|
ids = data["id"]
|
|
data = data.drop(["id"], axis=1)
|
|
|
|
total_nan = data.isna().sum().sum()
|
|
|
|
if "imp" in value_list:
|
|
data = imputations(data, class_label, path)
|
|
imputed_data = data
|
|
|
|
if "onehot" in value_list:
|
|
data = onehot(data, path)
|
|
|
|
if "std" in value_list:
|
|
data = scaling(data, class_label, path)
|
|
|
|
if "id" in data.columns:
|
|
data = pd.concat([ids.to_frame(), data], axis=1, ignore_index=False)
|
|
|
|
if total_nan > 0:
|
|
os.remove(name)
|
|
imputed_data = pd.concat(
|
|
[ids.to_frame(), imputed_data], axis=1, ignore_index=False
|
|
)
|
|
imputed_data.to_csv(name, index=False)
|
|
elif dataset_type == "timeseries":
|
|
# timeseries
|
|
# save last columns values
|
|
data_class_col = data.iloc[:, -1]
|
|
|
|
# drop last column that contains class_labels
|
|
data = data.iloc[:, :-1]
|
|
if "imp" in value_list:
|
|
data = imputations_ts(data, path)
|
|
if "denoise" in value_list:
|
|
data = data.apply(denoise, args=(path,), axis=0)
|
|
if "std" in value_list:
|
|
data = scaling_ts(data, path)
|
|
|
|
data = pd.concat([data, data_class_col], axis=1)
|
|
|
|
# os.remove(name)
|
|
# data.to_csv(name, index=False)
|
|
return data
|
|
|
|
|
|
###--------------------------###
|
|
### TIMESERIES PREPROCESSING ###
|
|
def scaling_ts(data, path):
|
|
# Normalize the data using Min-Max scaling
|
|
scaler = MinMaxScaler()
|
|
data[data.columns] = scaler.fit_transform(data)
|
|
pickle.dump(scaler, open(path + "/min_max_scaler.sav", "wb"))
|
|
return data
|
|
|
|
|
|
def denoise(series, path):
|
|
# Apply FFT
|
|
fft_vals = fft(series)
|
|
fft_freqs = np.fft.fftfreq(len(fft_vals))
|
|
|
|
# Filter frequencies
|
|
fft_vals[np.abs(fft_freqs) > 0.1] = 0
|
|
|
|
# Inverse FFT to reconstruct the signal
|
|
denoised_series = ifft(fft_vals).real
|
|
return pd.Series(denoised_series, index=series.index)
|
|
|
|
|
|
def outlier_detection(series, path):
|
|
median = series.median()
|
|
mad = median_abs_deviation(series)
|
|
return np.abs(series - median) / mad > 3
|
|
|
|
|
|
def imputations_ts(data, path):
|
|
data[data.columns] = data[data.columns].fillna(data.mean())
|
|
return data
|
|
|
|
|
|
### TIMESERIES PREPROCESSING ###
|
|
###--------------------------###
|
|
|
|
|
|
###--------------------------###
|
|
### TABULAR PREPROCESSING ###
|
|
def onehot(data, path):
|
|
encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
|
|
categorical_columns = data.select_dtypes(include=["object"]).columns.tolist()
|
|
# Apply one-hot encoding to the categorical columns
|
|
one_hot_encoded = encoder.fit_transform(data[categorical_columns]).astype(float)
|
|
one_hot_df = pd.DataFrame(
|
|
one_hot_encoded, columns=encoder.get_feature_names_out(categorical_columns)
|
|
)
|
|
|
|
pickle.dump(encoder, open(path + "/one_hot.sav", "wb"))
|
|
|
|
# Concatenate the one-hot encoded dataframe with the original dataframe
|
|
df_encoded = pd.concat([data, one_hot_df], axis=1)
|
|
|
|
# Drop the original categorical columns
|
|
data = df_encoded.drop(categorical_columns, axis=1)
|
|
return data
|
|
|
|
|
|
def imputations(data, class_label, path):
|
|
imp = SimpleImputer(missing_values=np.nan, strategy="mean")
|
|
|
|
y = data[class_label]
|
|
data = data.drop([class_label], axis=1)
|
|
|
|
numeric_cols = data.select_dtypes(include=["float64", "int64"]).columns
|
|
print("Numeric columns ", numeric_cols)
|
|
|
|
data[numeric_cols] = imp.fit_transform(data[numeric_cols])
|
|
|
|
# Convert back to DataFrame and restore original data types
|
|
data[numeric_cols] = data[numeric_cols].astype(float)
|
|
|
|
pickle.dump(imp, open(path + "/imp.sav", "wb"))
|
|
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
|
|
|
|
return data
|
|
|
|
|
|
def scaling(data, class_label, path):
|
|
scaler = StandardScaler()
|
|
|
|
# should not scale binary classes
|
|
# define standard scaler
|
|
y = data[class_label]
|
|
|
|
# if class column is numeric do not
|
|
# apply preprocessing
|
|
data = data.drop([class_label], axis=1)
|
|
|
|
# transform data
|
|
cols = data.select_dtypes(np.number).columns
|
|
# keep non-binary columns
|
|
nonbinary_columns = [
|
|
col for col in cols if not data[col].dropna().isin([0, 1]).all()
|
|
]
|
|
data[nonbinary_columns] = scaler.fit_transform(data[nonbinary_columns])
|
|
pickle.dump(scaler, open(path + "/standard_scaler.sav", "wb"))
|
|
data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False)
|
|
return data
|
|
|
|
|
|
### TABULAR PREPROCESSING ###
|
|
###--------------------------###
|
|
|
|
|
|
def decode_cf(df, row, class_label, path, preprocessing_list):
|
|
|
|
cf_row = row.copy()
|
|
# get actual numerical columns
|
|
df_numerical = df.select_dtypes(exclude=["object"]).columns.tolist()
|
|
|
|
nonbinary_numeric_columns = [
|
|
col for col in df_numerical if not df[col].dropna().isin([0, 1]).all()
|
|
]
|
|
|
|
# get actual categorical columns
|
|
df_categorical = (
|
|
df.drop([class_label], axis=1)
|
|
.select_dtypes(include=["object"])
|
|
.columns.tolist()
|
|
)
|
|
|
|
if "onehot" in preprocessing_list:
|
|
ohe = joblib.load(path + "/one_hot.sav")
|
|
# if there were categorical columns in the dataframe
|
|
if ohe.get_feature_names_out().size > 0:
|
|
one_hot_decoded = ohe.inverse_transform(cf_row[ohe.get_feature_names_out()])
|
|
cf_categorical_columns = ohe.get_feature_names_out()
|
|
# Drop the original categorical columns
|
|
cf_row = cf_row.drop(cf_categorical_columns, axis=1)
|
|
cf_row[df_categorical] = one_hot_decoded
|
|
|
|
if "std" in preprocessing_list:
|
|
scaler = joblib.load(path + "/standard_scaler.sav")
|
|
print(nonbinary_numeric_columns)
|
|
cf_row[nonbinary_numeric_columns] = scaler.inverse_transform(
|
|
cf_row[nonbinary_numeric_columns]
|
|
)
|
|
cf_row[class_label] = cf_row[class_label]
|
|
|
|
le = joblib.load(path + "/label_encoder.sav")
|
|
cf_row[class_label] = le.inverse_transform(cf_row[class_label])
|
|
return cf_row
|
|
|
|
|
|
def training(
|
|
data,
|
|
model,
|
|
test_size,
|
|
label,
|
|
dataset_type,
|
|
df_name,
|
|
model_path=None,
|
|
autoencoder="No",
|
|
experiment_arguments=None,
|
|
):
|
|
X = data
|
|
if dataset_type == "tabular":
|
|
if "id" in data.columns:
|
|
X = data.drop("id", axis=1)
|
|
y = X[label]
|
|
X = X.drop(label, axis=1)
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=test_size, stratify=y.values, random_state=42
|
|
)
|
|
|
|
if df_name == "stroke":
|
|
# needs Oversampling
|
|
|
|
## TODO if class_label is multi class SMOTE needs to have sampling strategy Dict
|
|
## TODO check if df needs oversampling
|
|
|
|
oversample = SMOTE(sampling_strategy=0.4, random_state=42)
|
|
X_sm, y_sm = oversample.fit_resample(
|
|
X_train,
|
|
y_train,
|
|
)
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X_sm, y_sm, test_size=test_size, stratify=y_sm.values, random_state=42
|
|
)
|
|
|
|
if "lr" == model:
|
|
from sklearn.linear_model import LogisticRegression
|
|
|
|
clf = LogisticRegression(random_state=0).fit(X_train, y_train)
|
|
y_pred = clf.predict(X_test)
|
|
filename = "lr.sav"
|
|
importance = clf.coef_[0]
|
|
model = clf
|
|
|
|
if "xgb" == model:
|
|
from xgboost import XGBClassifier
|
|
|
|
# TODO enable_categorical whould be dynamically added
|
|
# when there are categorical variables in the dataset
|
|
|
|
xgb = XGBClassifier(
|
|
n_estimators=200, # Number of trees (boosting rounds)
|
|
learning_rate=0.1, # Step size shrinkage (eta)
|
|
max_depth=4, # Maximum tree depth
|
|
min_child_weight=1, # Minimum sum of weights in a child
|
|
subsample=0.8, # Fraction of samples used per tree
|
|
colsample_bytree=0.8, # Fraction of features used per tree
|
|
gamma=0, # Minimum loss reduction to make a split
|
|
reg_lambda=1, # L2 regularization term (ridge)
|
|
reg_alpha=0, # L1 regularization term (lasso)
|
|
objective="binary:logistic", # Binary classification objective
|
|
use_label_encoder=False, # Avoids unnecessary warnings for older versions
|
|
eval_metric="logloss", # Logarithmic loss evaluation metric
|
|
).fit(X_train, y_train)
|
|
|
|
y_pred = xgb.predict(X_test)
|
|
filename = "xgb.sav"
|
|
importance = xgb.feature_importances_
|
|
model = xgb
|
|
|
|
if "dt" == model:
|
|
from sklearn.tree import DecisionTreeClassifier
|
|
|
|
dt = DecisionTreeClassifier(max_depth=30, random_state=42)
|
|
dt.fit(X_train, y_train)
|
|
y_pred = dt.predict(X_test)
|
|
filename = "dt.sav"
|
|
importance = dt.feature_importances_
|
|
model = dt
|
|
|
|
if "svm" == model:
|
|
from sklearn import svm
|
|
|
|
svc = svm.SVC(kernel="linear", probability=True)
|
|
svc.fit(X_train, y_train)
|
|
y_pred = svc.predict(X_test)
|
|
filename = "svm.sav"
|
|
importance = svc.coef_[0]
|
|
model = svc
|
|
|
|
if "rf" == model:
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
|
|
rf = RandomForestClassifier()
|
|
rf.fit(X_train, y_train)
|
|
y_pred = rf.predict(X_test)
|
|
filename = "rf.sav"
|
|
importance = rf.feature_importances_
|
|
model = rf
|
|
|
|
class_report = classification_report(y_test, y_pred.flatten(), output_dict=True)
|
|
class_report = pd.DataFrame(class_report)
|
|
feature_importance = px.bar(x=importance, y=X_train.columns)
|
|
|
|
if model_path:
|
|
pickle.dump(model, open(model_path + f"/{filename}", "wb"))
|
|
feature_importance_dict = dict(zip(X_train.columns, importance))
|
|
|
|
return feature_importance, class_report, feature_importance_dict
|
|
else:
|
|
# TODO: add 1dcnn train
|
|
|
|
if model == "glacier":
|
|
|
|
# Split the lr-list string and convert each value to float
|
|
# experiment_arguments[8] = experiment_arguments[8].rstrip(';')
|
|
# lr_list = [float(x) for x in experiment_arguments[8].split()]
|
|
|
|
gc_latentcf_search_1dcnn(
|
|
data,
|
|
int(experiment_arguments[1]),
|
|
int(experiment_arguments[2]),
|
|
model_path + f"/{experiment_arguments[3]}",
|
|
model_path,
|
|
autoencoder,
|
|
)
|
|
|
|
elif model == "wildboar_knn" or model == "wildboar_rsf":
|
|
X = data.iloc[:, :-1].values
|
|
y = data.iloc[:, -1].values
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=test_size, random_state=1
|
|
)
|
|
|
|
# n_samples, n_timestep = X_train.shape
|
|
# y_labels, counts = np.unique(y_train, return_counts=True)
|
|
|
|
# print(
|
|
# f"""
|
|
# The dataset contains {n_samples} samples with {n_timestep} time steps each.
|
|
# Of the samples, {counts[0]} is labeled as {y_labels[0]} and {counts[1]} labeled
|
|
# as {y_labels[1]}. Here, we plot the time series.
|
|
# """
|
|
# )
|
|
|
|
from wildboar.utils.plot import plot_time_domain
|
|
|
|
if model == "wildboar_knn":
|
|
from wildboar.distance import KNeighborsClassifier
|
|
from wildboar.explain.counterfactual import KNeighborsCounterfactual
|
|
|
|
filename = "wildboar_knn.sav"
|
|
classifier = KNeighborsClassifier(
|
|
n_neighbors=5, metric="dtw", metric_params={"r": 0.5}
|
|
)
|
|
explainer = KNeighborsCounterfactual(random_state=1, method="auto")
|
|
|
|
if model == "wildboar_rsf":
|
|
from wildboar.ensemble import ShapeletForestClassifier
|
|
from wildboar.explain.counterfactual import ShapeletForestCounterfactual
|
|
|
|
filename = "wildboar_rsf.sav"
|
|
classifier = ShapeletForestClassifier(
|
|
n_estimators=100,
|
|
metric="euclidean",
|
|
max_depth=5,
|
|
random_state=1,
|
|
)
|
|
explainer = ShapeletForestCounterfactual(random_state=1)
|
|
|
|
classifier.fit(X_train, y_train)
|
|
# Assuming you have X_test and y_test as test data
|
|
y_pred = classifier.predict(X_test)
|
|
# Generate the classification report
|
|
class_report = classification_report(y_test, y_pred, output_dict=True)
|
|
# Convert the classification report to a pandas DataFrame
|
|
class_report = pd.DataFrame(class_report).transpose()
|
|
|
|
X_cf, y_pred, cf_pred = find_counterfactuals(classifier, explainer, X_test)
|
|
|
|
# save x_test, y_test for future use
|
|
if model_path:
|
|
x_test_df = pd.DataFrame(X_test)
|
|
x_test_df.columns = data.iloc[:, :-1].columns
|
|
x_test_df.to_csv(model_path + "/X_test.csv", index=None)
|
|
np.save(model_path + "/y_test.npy", y_test)
|
|
pickle.dump(classifier, open(model_path + f"/{filename}", "wb"))
|
|
np.save(model_path + "/X_cf.npy", X_cf)
|
|
np.save(model_path + "/y_pred.npy", y_pred)
|
|
np.save(model_path + "/cf_pred.npy", cf_pred)
|
|
|
|
return class_report
|
|
|
|
|
|
def testing(name, type):
|
|
data = pd.read_csv(name)
|
|
|
|
y_test = data["diagnosis"]
|
|
X_test = data.drop("diagnosis", axis=1)
|
|
|
|
if "lr" == type:
|
|
filename = "lr.sav"
|
|
clf = joblib.load(filename)
|
|
y_pred = clf.predict(X_test)
|
|
importance = clf.coef_[0]
|
|
model = clf
|
|
|
|
if "xgb" == type:
|
|
filename = "xgb.sav"
|
|
xgb = joblib.load(filename)
|
|
y_pred = xgb.predict(X_test)
|
|
filename = "xgb.sav"
|
|
importance = xgb.feature_importances_
|
|
model = xgb
|
|
|
|
if "dt" == type:
|
|
filename = "dt.sav"
|
|
dt = joblib.load(filename)
|
|
y_pred = dt.predict(X_test)
|
|
importance = dt.feature_importances_
|
|
model = dt
|
|
|
|
if "svm" == type:
|
|
filename = "svm.sav"
|
|
svc = joblib.load(filename)
|
|
y_pred = svc.predict(X_test)
|
|
importance = svc.coef_[0]
|
|
model = svc
|
|
|
|
if "rf" == type:
|
|
filename = "rf.sav"
|
|
rf = joblib.load(filename)
|
|
y_pred = rf.predict(X_test)
|
|
importance = rf.feature_importances_
|
|
model = rf
|
|
|
|
clas_report = classification_report(y_test, y_pred, output_dict=True)
|
|
clas_report = pd.DataFrame(clas_report).transpose()
|
|
clas_report = clas_report.sort_values(by=["f1-score"], ascending=False)
|
|
fig2 = px.bar(x=importance, y=X_test.columns)
|
|
pickle.dump(model, open(filename, "wb"))
|
|
con = {
|
|
"fig2": fig2.to_html(),
|
|
"clas_report": clas_report,
|
|
}
|
|
return con
|
|
|
|
|
|
# compute counterfactuals
|
|
def counterfactuals(
|
|
query,
|
|
model,
|
|
df,
|
|
class_label,
|
|
continuous_features,
|
|
num_counterfactuals=5,
|
|
features_to_vary=[],
|
|
):
|
|
if "id" in df.columns:
|
|
df = df.drop("id", axis=1)
|
|
if "id" in query.columns:
|
|
query = query.drop("id", axis=1)
|
|
|
|
query = query.drop(class_label, axis=1)
|
|
# data = df.drop(class_label, axis=1)
|
|
# continuous_features = df.drop(class_label, axis=1).columns.tolist()
|
|
# continuous_features = (
|
|
# df.drop(class_label, axis=1).select_dtypes(exclude=["object"]).columns.tolist()
|
|
# )
|
|
|
|
print(df.dtypes)
|
|
d = dice_ml.Data(
|
|
dataframe=df,
|
|
continuous_features=continuous_features,
|
|
outcome_name=class_label,
|
|
)
|
|
m = dice_ml.Model(model=model, backend="sklearn")
|
|
exp = dice_ml.Dice(d, m)
|
|
|
|
if len(features_to_vary) > 0:
|
|
try:
|
|
dice_exp = exp.generate_counterfactuals(
|
|
query,
|
|
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
|
|
desired_class="opposite", # We want to convert the quality to the opposite one.
|
|
features_to_vary=features_to_vary,
|
|
proximity_weight=0.5, # Control proximity
|
|
diversity_weight=1.0, # Control diversity
|
|
sparsity_weight=0.5, # Enforce minimal feature changes
|
|
random_seed=42,
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
dice_exp = None
|
|
else:
|
|
try:
|
|
dice_exp = exp.generate_counterfactuals(
|
|
query,
|
|
total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple.
|
|
desired_class="opposite", # We want to convert the quality to the opposite one.
|
|
proximity_weight=0.5, # Control proximity
|
|
diversity_weight=1.0, # Control diversity
|
|
sparsity_weight=0.5, # Enforce minimal feature changes
|
|
random_seed=42,
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
dice_exp = None
|
|
|
|
if dice_exp:
|
|
return dice_exp._cf_examples_list
|
|
return dice_exp
|
|
|
|
|
|
def get_dataframe(path):
|
|
df = pd.read_csv(path)
|
|
return df
|
|
|
|
|
|
def generatePCA(preprocess_df):
|
|
pca = PCA()
|
|
pca.fit(preprocess_df)
|
|
exp_var_cumul = np.cumsum(pca.explained_variance_ratio_)
|
|
pca = px.area(
|
|
x=range(1, exp_var_cumul.shape[0] + 1),
|
|
y=exp_var_cumul,
|
|
labels={"x": "# Components", "y": "Explained Variance"},
|
|
)
|
|
|
|
pca.update_layout(
|
|
autosize=True,
|
|
)
|
|
return pca
|
|
|
|
|
|
def generateTSNE(preprocess_df, dataset_type, class_label=None):
|
|
# tSNE
|
|
tsne = TSNE(n_components=2, random_state=39)
|
|
|
|
if dataset_type == "tabular":
|
|
projections = tsne.fit_transform(preprocess_df.drop(class_label, axis=1).values)
|
|
tsne_df = pd.DataFrame(
|
|
{
|
|
"0": projections[:, 0],
|
|
"1": projections[:, 1],
|
|
class_label: preprocess_df[class_label].astype(str),
|
|
}
|
|
)
|
|
|
|
# render_mode="svg" will prevent the scatter from getting to GL mode
|
|
# for sufficiently large input. It was observed that for datasets
|
|
# larger than 1000 entries, the scatter html object that would be
|
|
# generated would lack the <g class="points" ... > </g> element containing
|
|
# all the points of the scatter. Using that we could connect the click
|
|
# of a point with the actual point in the dataset. Thus it was vital
|
|
# that there exists such an element and also is accessible.
|
|
# By default, scatter disables it to free space in the client side
|
|
# and by using render_mode="svg" we avoid that behaviour.
|
|
# https://github.com/plotly/plotly_express/issues/145
|
|
|
|
tsne = px.scatter(
|
|
tsne_df,
|
|
x="0",
|
|
y="1",
|
|
color=class_label,
|
|
render_mode="svg",
|
|
)
|
|
tsne.update_layout(clickmode="event+select", autosize=True)
|
|
elif dataset_type == "timeseries":
|
|
preprocess_df_drop_class = preprocess_df.iloc[:, :-1]
|
|
projections = tsne.fit_transform(preprocess_df_drop_class)
|
|
tsne_df = pd.DataFrame(
|
|
{
|
|
"0": projections[:, 0],
|
|
"1": projections[:, 1],
|
|
"class": preprocess_df.iloc[:, -1].astype(str),
|
|
}
|
|
)
|
|
|
|
# render_mode="svg" will prevent the scatter from getting to GL mode
|
|
# for sufficiently large input. It was observed that for datasets
|
|
# larger than 1000 entries, the scatter html object that would be
|
|
# generated would lack the <g class="points" ... > </g> element containing
|
|
# all the points of the scatter. Using that we could connect the click
|
|
# of a point with the actual point in the dataset. Thus it was vital
|
|
# that there exists such an element and also is accessible.
|
|
# By default, scatter disables it to free space in the client side
|
|
# and by using render_mode="svg" we avoid that behaviour.
|
|
# https://github.com/plotly/plotly_express/issues/145
|
|
|
|
tsne = px.scatter(
|
|
tsne_df,
|
|
x="0",
|
|
y="1",
|
|
color="class",
|
|
render_mode="svg",
|
|
)
|
|
tsne.update_layout(clickmode="event+select", autosize=True)
|
|
|
|
return tsne, projections
|
|
|
|
|
|
def generateAugmentedTSNE(
|
|
df, cf_df, num_counterfactuals, point, tsne_path, class_label
|
|
):
|
|
"""
|
|
Given a tsne graph, add the traces of the computed counterfactuals for a given point and return the new graph
|
|
|
|
Parameters
|
|
----------
|
|
df: dataframe used to compute tsne
|
|
cf_df: counterfactuals dataframe
|
|
point: original point of for which counterfatuals were computed
|
|
tsne_path: path to the original tsne plot
|
|
|
|
Returns
|
|
-------
|
|
The tsne graph updated with new counterfactuals points. Cunterfactual points and the point itself are resized
|
|
|
|
"""
|
|
|
|
# make the tsne (the same tsne but with extra points
|
|
# the counterfactuals points descriped in
|
|
# counterfactuals.csv)
|
|
tsne_cf = TSNE(n_components=2, random_state=0)
|
|
|
|
# merge counterfactuals csv with tsne_data
|
|
df_merged = pd.concat([cf_df, df], ignore_index=True, axis=0)
|
|
projections = tsne_cf.fit_transform(df_merged.drop(class_label, axis=1).values)
|
|
|
|
# cf_df containes the projection values and the class_label
|
|
# value of the counterfactual points. Porjections is np array
|
|
# containing pairs of x and y values for each tsne graph point
|
|
#
|
|
cf_df = pd.DataFrame(
|
|
{
|
|
"0": projections[:num_counterfactuals, 0],
|
|
"1": projections[:num_counterfactuals, 1],
|
|
class_label: cf_df[class_label].iloc[:num_counterfactuals].astype(str),
|
|
}
|
|
)
|
|
|
|
# cf_df = pd.concat([cf_df, point], ignore_index=True, axis=0)
|
|
# new = {'0':'Front hello', '1': 'hi'}
|
|
# cf_s.for_each_trace(lambda t: t.update(name = new[t.name]))
|
|
point_s = px.scatter(
|
|
point,
|
|
x="0",
|
|
y="1",
|
|
color=class_label,
|
|
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
|
|
render_mode="svg",
|
|
)
|
|
point_s["data"][0]["name"] = "Original data"
|
|
|
|
cf_s = px.scatter(
|
|
cf_df,
|
|
x="0",
|
|
y="1",
|
|
color=class_label,
|
|
color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"},
|
|
render_mode="svg",
|
|
)
|
|
cf_s["data"][0]["name"] = "Counterfactual"
|
|
|
|
clicked_and_cf_s = go.Figure(data=cf_s.data + point_s.data)
|
|
clicked_and_cf_s.update_traces(
|
|
marker=dict(size=10, symbol="circle", line=dict(width=2))
|
|
)
|
|
|
|
tsne = joblib.load(tsne_path)
|
|
tsne = go.Figure(data=tsne.data + clicked_and_cf_s.data)
|
|
tsne.update_layout(clickmode="event+select", autosize=True)
|
|
|
|
# tsne.add_trace(cf_s.data[0])
|
|
return tsne
|
|
|
|
# tsne_cf = TSNE(n_components=2, random_state=0)
|
|
# projections = tsne_cf.fit_transform(
|
|
# df_merged.drop(["diagnosis"], axis=1).values
|
|
# )
|
|
|
|
# cf_df = pd.DataFrame(
|
|
# {
|
|
# "0": projections[:num_counterfactuals, 0],
|
|
# "1": projections[:num_counterfactuals, 1],
|
|
# "diagnosis": cf_df.diagnosis.iloc[:3],
|
|
# }
|
|
# )
|
|
|
|
# cf_df = pd.concat([cf_df, clicked_point_df], ignore_index=True, axis=0)
|
|
|
|
# cf_s = px.scatter(
|
|
# cf_df,
|
|
# x="0",
|
|
# y="1",
|
|
# color="diagnosis",
|
|
# color_continuous_scale=px.colors.sequential.Rainbow,
|
|
# )
|
|
|
|
# cf_s.update_traces(
|
|
# marker=dict(
|
|
# size=10,
|
|
# symbol="circle",
|
|
# )
|
|
# )
|
|
|
|
# tsne = joblib.load("tsne.sav")
|
|
# tsne.add_trace(cf_s.data[0])
|
|
# pickle.dump(tsne, open("tsne_cfs.sav", "wb"))
|
|
# tsne = tsne.to_html()
|
|
|
|
|
|
def get_ecg_entry(X_test, y_test, i, class_label):
|
|
# timeseries
|
|
# samples subplots
|
|
# Create subplots
|
|
fig = go.Figure()
|
|
y = X_test[y_test == class_label].iloc[i]
|
|
index = X_test[y_test == class_label].index[i]
|
|
|
|
if class_label == 0:
|
|
name = "Normal ECG"
|
|
else:
|
|
name = "Abnormal ECG"
|
|
|
|
# Adding the ECG trace
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
y=y,
|
|
mode="lines",
|
|
line=dict(width=1),
|
|
)
|
|
)
|
|
|
|
# Updating the layout to reduce margins and make the plot more compact
|
|
fig.update_layout(
|
|
xaxis_title="Timestep",
|
|
yaxis_title=name,
|
|
hovermode="x",
|
|
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
|
|
)
|
|
|
|
# Adjust y-axis to start from zero, customize the grid, and add more space
|
|
fig.update_yaxes(
|
|
rangemode="tozero",
|
|
showgrid=True,
|
|
gridwidth=1, # Makes the gridlines slightly more pronounced
|
|
tickvals=[min(y), max(y)],) # Addspacing between gridlights
|
|
return fig, int(index)
|
|
|
|
|
|
def ecg_plot_counterfactuals(i, X_test, y_test, y_pred, X_cf, cf_pred):
|
|
fig = go.Figure()
|
|
|
|
neg = 0
|
|
pos = 1
|
|
y_test = np.where(y_test == 1, pos, neg)
|
|
# print("y_test: ", y_test)
|
|
# print("y_pred: ", y_pred)
|
|
# print("cf_pred: ", cf_pred)
|
|
|
|
# Original time series
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
y=X_test.iloc[i],
|
|
mode="lines",
|
|
name="Original (y_pred = %d, y_actual = %d)" % (y_pred[i], y_test[i]),
|
|
line=dict(width=0.5),
|
|
)
|
|
)
|
|
|
|
if len(X_cf[i].shape) > 1:
|
|
X_cf_flattened = X_cf[i].flatten()
|
|
|
|
# Counterfactual time series
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
y=X_cf_flattened,
|
|
mode="lines",
|
|
name="Counterfactual (y = %d)" % cf_pred[i],
|
|
line=dict(width=1),
|
|
)
|
|
)
|
|
else:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
y=X_cf[i],
|
|
mode="lines",
|
|
name="Counterfactual (y = %d)" % cf_pred[i],
|
|
line=dict(width=1),
|
|
)
|
|
)
|
|
|
|
# Updating the layout to reduce margins and make the plot more compact
|
|
fig.update_layout(
|
|
xaxis_title="Timestep",
|
|
hovermode="x",
|
|
margin=dict(l=10, r=10, t=30, b=10), # Reduced margins
|
|
)
|
|
|
|
# Adjust y-axis to start from zero, customize the grid, and add more space
|
|
fig.update_yaxes(
|
|
rangemode="tozero",
|
|
showgrid=True,
|
|
gridwidth=1, # Makes the gridlines slightly more pronounced
|
|
tickvals=[min(X_test.iloc[i]), max(X_test.iloc[i])],) # Addspacing between gridlights
|
|
|
|
# # Mean time series of the counterfactual class
|
|
# mean_cf_class = np.mean(X_test.loc[y_test == cf_pred[i]], axis=0)
|
|
# fig.add_trace(go.Scatter(
|
|
# y=mean_cf_class,
|
|
# mode='lines',
|
|
# name="Mean of X with y = %d" % cf_pred[i],
|
|
# line=dict(width=1, dash='dash')
|
|
# ))
|
|
|
|
fig.update_layout(
|
|
xaxis_title="Timepoints", yaxis_title="Values", legend=dict(x=0.01, y=0.99)
|
|
)
|
|
|
|
return fig
|
|
|
|
|
|
def get_info_of_dataframe(df):
|
|
# Creating a DataFrame to store the summary
|
|
|
|
summary_data = {
|
|
"Total Rows": [df.shape[0]],
|
|
"Total Columns": [df.shape[1]],
|
|
"Missing Values (Total)": [df.isnull().sum().sum()],
|
|
"Missing Values (Columns)": [df.isnull().any(axis=0).sum()],
|
|
"Categorical Columns": [(df.dtypes == "object").sum()],
|
|
"Numeric Columns": [df.select_dtypes(include=["number"]).shape[1]],
|
|
}
|
|
|
|
summary_df = pd.DataFrame(summary_data)
|
|
|
|
# Create a Plotly Table with enhanced styling
|
|
fig = go.Figure(
|
|
data=[
|
|
go.Table(
|
|
header=dict(
|
|
values=["<b>Metric</b>", "<b>Value</b>"],
|
|
fill_color="#4CAF50",
|
|
align="left",
|
|
font=dict(color="white", size=14),
|
|
height=30,
|
|
),
|
|
cells=dict(
|
|
values=[summary_df.columns, summary_df.iloc[0].tolist()],
|
|
fill_color=[["#f9f9f9", "white"] * len(summary_df)],
|
|
align="left",
|
|
font=dict(color="black", size=12),
|
|
height=25,
|
|
),
|
|
)
|
|
]
|
|
)
|
|
|
|
fig.update_layout(
|
|
title_x=0.5, # Center title
|
|
title_y=0.95,
|
|
margin=dict(l=20, r=20, t=50, b=20),
|
|
width=600,
|
|
height=300, # Adjust height based on the content
|
|
)
|
|
|
|
# Convert Plotly figure to HTML
|
|
return fig.to_html()
|
|
|
|
|
|
def update_column_list_with_one_hot_columns(df_original, df_encoded, column_list):
|
|
updated_columns = []
|
|
|
|
for column in column_list:
|
|
# Check if the column is categorical in the original dataset
|
|
if (
|
|
pd.api.types.is_categorical_dtype(df_original[column])
|
|
or df_original[column].dtype == "object"
|
|
):
|
|
# The column is categorical, so find the one-hot encoded sub-columns
|
|
one_hot_columns = [
|
|
col for col in df_encoded.columns if col.startswith(f"{column}_")
|
|
]
|
|
|
|
# Replace the original column name with the one-hot encoded sub-columns
|
|
if one_hot_columns:
|
|
updated_columns.extend(
|
|
one_hot_columns
|
|
) # Add the sub-columns to the updated list
|
|
else:
|
|
# If no one-hot encoded columns are found (for some reason), keep the original column
|
|
updated_columns.append(column)
|
|
else:
|
|
# If the column is not categorical, keep it as is
|
|
updated_columns.append(column)
|
|
|
|
return updated_columns
|
|
|
|
|
|
# Function to extract continuous features
|
|
def get_continuous_features(df):
|
|
# Filter columns based on dtype and exclude binary columns
|
|
continuous_columns = df.select_dtypes(include=["float64", "int64"]).columns
|
|
|
|
# Exclude binary features (0 and 1 values)
|
|
continuous_columns = [
|
|
col
|
|
for col in continuous_columns
|
|
if df[col].nunique() > 2 # Exclude binary features
|
|
]
|
|
|
|
# Return only the continuous features
|
|
return list(continuous_columns)
|
|
|
|
|
|
# Function to extract continuous features
|
|
def get_categorical_features(df):
|
|
# Filter columns based on dtype and exclude binary columns
|
|
categorical_columns = df.select_dtypes(include=["object", "category"]).columns
|
|
|
|
# Return only the continuous features
|
|
return list(categorical_columns)
|
|
|
|
|
|
# Function to extract non-continuous features
|
|
def get_non_continuous_features(df):
|
|
# Select numeric columns
|
|
numeric_columns = df.select_dtypes(include=["float64", "int64"]).columns
|
|
|
|
# Identify binary columns (having only two unique values, like 0/1)
|
|
binary_columns = [col for col in numeric_columns if df[col].nunique() == 2]
|
|
|
|
# Select non-numeric columns
|
|
non_numeric_columns = df.select_dtypes(
|
|
exclude=["float64", "int64"]
|
|
).columns.tolist()
|
|
|
|
# Combine binary columns and non-numeric columns
|
|
non_continuous_columns = binary_columns + non_numeric_columns
|
|
|
|
# Return only the non-continuous features
|
|
return list(non_continuous_columns)
|
|
|
|
|
|
def find_counterfactuals(estimator, explainer, X):
|
|
y_pred = estimator.predict(X)
|
|
y_desired = np.empty_like(y_pred)
|
|
|
|
# Store an array of the desired label for each sample.
|
|
# We assume a binary classification task and the the desired
|
|
# label is the inverse of the predicted label.
|
|
a, b = estimator.classes_
|
|
y_desired[y_pred == a] = b
|
|
y_desired[y_pred == b] = a
|
|
|
|
# Initialize the explainer, using the medoid approach.
|
|
explainer.fit(estimator)
|
|
|
|
# Explain each sample in X as the desired label in y_desired
|
|
X_cf = explainer.explain(X, y_desired)
|
|
return X_cf, y_pred, estimator.predict(X_cf)
|
|
|
|
|
|
def is_column_categorical_like(
|
|
df, column_name, unique_threshold=10, ratio_threshold=0.05
|
|
):
|
|
"""
|
|
Determines if a numeric column has categorical characteristics.
|
|
|
|
Parameters:
|
|
df (DataFrame): The DataFrame containing the data.
|
|
column_name (str): The column name to check.
|
|
unique_threshold (int): Maximum number of unique values to consider as categorical.
|
|
ratio_threshold (float): Maximum ratio of unique values to total rows to consider as categorical.
|
|
|
|
Returns:
|
|
bool: True if the column is likely categorical, False otherwise.
|
|
"""
|
|
unique_values = df[column_name].nunique() # Number of unique values
|
|
total_values = len(df[column_name]) # Total number of rows
|
|
unique_ratio = unique_values / total_values # Ratio of unique values to total rows
|
|
|
|
# Check if the column is numeric
|
|
if pd.api.types.is_numeric_dtype(df[column_name]):
|
|
# Consider it categorical if it has fewer than `unique_threshold` unique values
|
|
# or if the unique values ratio is below `ratio_threshold`
|
|
if unique_values <= unique_threshold or unique_ratio <= ratio_threshold:
|
|
return True
|
|
return False
|
|
|
|
|
|
# Function to flatten the dictionary
|
|
def flatten_dict(d, parent_key="", sep="_"):
|
|
items = []
|
|
for k, v in d.items():
|
|
new_key = parent_key + sep + k if parent_key else k
|
|
if isinstance(v, dict):
|
|
items.extend(flatten_dict(v, new_key, sep=sep).items())
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
|
|
def convert_to_camel_case(s):
|
|
# Split the string by hyphen
|
|
parts = s.split("-")
|
|
|
|
# Capitalize each part and join them together
|
|
camel_case = "".join(word.capitalize() for word in parts)
|
|
|
|
return camel_case
|
|
|
|
|
|
# def fetch_line_by_dataset(file_path, dataset, constraint):
|
|
|
|
|
|
def fetch_line_by_dataset(file_path, dataset):
|
|
"""
|
|
Fetches a line from the file based on the specified dataset name to retrieve basic information about the dataset.
|
|
|
|
:param file_path: Path to the input file.
|
|
:param dataset: The dataset name to search for.
|
|
:return: The line matching the dataset and constraint, or None if not found.
|
|
"""
|
|
with open(file_path, "r") as file:
|
|
for line in file:
|
|
# Strip leading whitespace
|
|
stripped_line = line.strip()
|
|
# Skip lines that start with #
|
|
if stripped_line.startswith("#"):
|
|
continue
|
|
# Use regular expressions for exact match of the dataset
|
|
dataset_pattern = rf"--dataset\s+{re.escape(dataset)}\b"
|
|
if re.search(dataset_pattern, stripped_line):
|
|
return stripped_line
|
|
return None
|
|
|
|
|
|
def extract_arguments_from_line(line):
|
|
"""
|
|
Extracts all words that come immediately after '--' arguments in the line.
|
|
|
|
:param line: A string containing the line to parse.
|
|
:return: A list of argument values found in the line.
|
|
"""
|
|
# Find all arguments and their values
|
|
# matches = re.findall(r"(--[\w-]+)((?:\s+[^-][^\s]*)*)", line)
|
|
matches = re.findall(r"(--[\w-]+)\s+([^\s]+)", line)
|
|
|
|
# Extract argument values
|
|
arguments = [value.strip() for _, value in matches if value.strip()]
|
|
return arguments
|
|
|
|
|
|
def create_tuple_of_models_text_value(available_pre_trained_models):
|
|
available_pretrained_models_info = []
|
|
for model in available_pre_trained_models:
|
|
if "xgb" == model:
|
|
available_pretrained_models_text = "XGBoost"
|
|
elif "rf" == model:
|
|
available_pretrained_models_text = "Random Forest"
|
|
elif "lr" == model:
|
|
available_pretrained_models_text = "Logistic Regression"
|
|
elif "dt" == model:
|
|
available_pretrained_models_text = "Decision Tree"
|
|
elif "svm" == model:
|
|
available_pretrained_models_text = "Support Vector Machine"
|
|
elif "glacier" == model:
|
|
available_pretrained_models_text = "Glacier 1dCNN"
|
|
elif "wildboar_knn" == model:
|
|
available_pretrained_models_text = "Wildboar K-Nearest Neighbours"
|
|
elif "wildboar_rsf" == model:
|
|
available_pretrained_models_text = "Wildboar Random Shapelet Forest"
|
|
|
|
available_pretrained_models_info.append(
|
|
(model, available_pretrained_models_text)
|
|
)
|
|
|
|
return available_pretrained_models_info |