import dice_ml.data_interfaces import dice_ml.data_interfaces.private_data_interface import pandas as pd import pickle, os from sklearn.impute import SimpleImputer from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import OneHotEncoder import numpy as np from pandas.api.types import is_numeric_dtype from sklearn.metrics import classification_report import plotly.express as px from django.conf import settings import joblib from sklearn.decomposition import PCA from sklearn.manifold import TSNE import dice_ml from dict_and_html import * import plotly.graph_objects as go import math from imblearn.over_sampling import SMOTE from scipy.stats import median_abs_deviation from numpy.fft import * from sklearn.preprocessing import MinMaxScaler from plotly.subplots import make_subplots from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report from .glacier.src.gc_latentcf_search_1dcnn_function import gc_latentcf_search_1dcnn from .glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals import re import json PIPELINE_PATH = os.path.join(settings.BASE_DIR, "base/pipelines/") def stats( dataset_path, dataset_type, pos=None, neg=None, feature1=None, feature2=None, label=None, name=None, ): print(dataset_type) if dataset_type == "tabular": df = pd.read_csv(dataset_path) binary1 = df[feature1].isin([0, 1]).all() binary2 = df[feature2].isin([0, 1]).all() if binary1 == True or binary2 == True: fig = px.histogram(df, x=feature1, y=feature2, color=label) elif is_numeric_dtype(df[feature1]) or is_numeric_dtype(df[feature2]): if not is_numeric_dtype(df[feature1]) or not is_numeric_dtype(df[feature2]): # feature1 is not numeric but feature2 should be fig = px.histogram(df, x=feature1, y=feature2, color=label) else: # they both are numeric so do scatter if is_column_categorical_like( df, feature1 ) and not is_column_categorical_like(df, feature2): # Add jitter to the 'Categorical_Like_Numeric' column df[feature1] = df[feature1] + np.random.uniform( -0.1, 0.1, size=df.shape[0] ) # Create a scatter plot using Plotly fig = px.scatter( df, x=feature1, y=feature2, color=df[label].astype(str), ) elif is_column_categorical_like( df, feature2 ) and not is_column_categorical_like(df, feature1): print(df) df[feature2] = df[feature2] + np.random.uniform( -0.1, 0.1, size=df.shape[0] ) # Create a scatter plot using Plotly fig = px.scatter( df, x=feature1, y=feature2, color=df[label].astype(str), ) elif is_column_categorical_like( df, feature2 ) and is_column_categorical_like(df, feature1): df_grouped = ( df.groupby([feature1, feature2, label]) .size() .reset_index(name="Count") ) # Create a bubble plot fig = px.scatter( df_grouped, x=feature1, y=feature2, size="Count", color=df_grouped[label].astype(str), ) else: # print( # is_column_categorical_like(df, feature1), # is_column_categorical_like(df, feature2), # ) fig = px.scatter( df, x=feature1, y=feature2, color=df[label].astype(str) ) else: # they both are categorical fig = px.bar(df, x=feature1, y=feature2, color=label, barmode="group") fig.update_layout(clickmode="event+select", autosize=True) elif dataset_type == "timeseries": # timeseries df = pd.read_csv(dataset_path) # samples subplots # Create subplots # column numbers - target_column # TODO: case for when the dataset has # id column if name: if name == "two-lead-ecg": negative_label = "Signal 0" positive_label = "Signal 1" elif name == "gun-point": negative_label = "Gun" positive_label = "No gun" elif name == "italy-power-demand": negative_label = "October to March power demand" positive_label = "April to September power demand" elif name == "ecg-five-days": negative_label = "12/11/1990" positive_label = "17/11/1990" elif name == "ford-a": negative_label = "Negative label" positive_label = "Positive label" # hard coded need to be dynamic based on # dataset negative_label_value = neg positive_label_value = pos num_timesteps = df.shape[1] - 1 fig = make_subplots( rows=2, cols=2, subplot_titles=( negative_label, negative_label, positive_label, positive_label, ), ) # suppose univariative # TODO: multivariative target_labels = list(df.iloc[:, -1].unique()) positive = target_labels[1] negative = target_labels[0] # Add normal ECG trace 1 fig.add_trace( go.Scatter( x=list(range(num_timesteps)), y=df[df.iloc[:, -1] == negative_label_value].iloc[0, :-1], mode="lines", name=negative_label, ), row=1, col=1, ) # Add normal ECG trace 2 fig.add_trace( go.Scatter( x=list(range(num_timesteps)), y=df[df.iloc[:, -1] == negative_label_value].iloc[1, :-1], mode="lines", name=negative_label, ), row=1, col=2, ) # Add abnormal ECG trace 1 fig.add_trace( go.Scatter( x=list(range(num_timesteps)), y=df[df.iloc[:, -1] == positive_label_value].iloc[0, :-1], mode="lines", name=positive_label, ), row=2, col=1, ) # Add abnormal ECG trace 2 fig.add_trace( go.Scatter( x=list(range(num_timesteps)), y=df[df.iloc[:, -1] == positive_label_value].iloc[1, :-1], mode="lines", name=positive_label, ), row=2, col=2, ) # Update layout fig.update_layout( xaxis_title="Timesteps", yaxis_title="ECG Value", showlegend=False, autosize=True, ) # confidence plot df = df.iloc[:, :-1] df_grouped = df.agg(["mean", "std", "count"]).transpose() df_grouped["ci"] = 40 * df_grouped["std"] / np.sqrt(df_grouped["count"]) df_grouped["ci_lower"] = df_grouped["mean"] - df_grouped["ci"] df_grouped["ci_upper"] = df_grouped["mean"] + df_grouped["ci"] fig1 = go.Figure( [ go.Scatter( name="Avg", x=df_grouped.index, y=round(df_grouped["mean"], 2), mode="lines", line=dict(color="rgb(31, 119, 180)"), ), go.Scatter( name="95% CI Upper", x=df_grouped.index, y=round(df_grouped["ci_upper"], 2), mode="lines", marker=dict(color="#444"), line=dict(width=0), showlegend=False, ), go.Scatter( name="95% CI Lower", x=df_grouped.index, y=round(df_grouped["ci_lower"], 2), marker=dict(color="#444"), line=dict(width=0), mode="lines", fillcolor="rgba(68, 68, 68, 0.3)", fill="tonexty", showlegend=False, ), ] ) fig1.update_layout( title="Confidence plot for Two Lead ECG dataset", xaxis_title="Timestep", yaxis_title="Avg ECG value", hovermode="x", ) fig1.update_yaxes(rangemode="tozero") return fig.to_html(), fig1.to_html() # fig = px.line(df.iloc[int(feature1)]) return fig.to_html() def compare_values(val1, val2): if isinstance(val1, float) and isinstance(val2, float): return not math.isclose(float(val1), float(val2)) else: return val1 != val2 def preprocess(data, value_list, name, dataset_type, path=None, class_label=None): if dataset_type == "tabular": if "id" in data.columns: ids = data["id"] data = data.drop(["id"], axis=1) total_nan = data.isna().sum().sum() if "imp" in value_list: data = imputations(data, class_label, path) imputed_data = data if "onehot" in value_list: data = onehot(data, path) if "std" in value_list: data = scaling(data, class_label, path) if "id" in data.columns: data = pd.concat([ids.to_frame(), data], axis=1, ignore_index=False) if total_nan > 0: os.remove(name) imputed_data = pd.concat( [ids.to_frame(), imputed_data], axis=1, ignore_index=False ) imputed_data.to_csv(name, index=False) elif dataset_type == "timeseries": # timeseries # save last columns values data_class_col = data.iloc[:, -1] # drop last column that contains class_labels data = data.iloc[:, :-1] if "imp" in value_list: data = imputations_ts(data, path) if "denoise" in value_list: data = data.apply(denoise, args=(path,), axis=0) if "std" in value_list: data = scaling_ts(data, path) data = pd.concat([data, data_class_col], axis=1) # os.remove(name) # data.to_csv(name, index=False) return data ###--------------------------### ### TIMESERIES PREPROCESSING ### def scaling_ts(data, path): # Normalize the data using Min-Max scaling scaler = MinMaxScaler() data[data.columns] = scaler.fit_transform(data) pickle.dump(scaler, open(path + "/min_max_scaler.sav", "wb")) return data def denoise(series, path): # Apply FFT fft_vals = fft(series) fft_freqs = np.fft.fftfreq(len(fft_vals)) # Filter frequencies fft_vals[np.abs(fft_freqs) > 0.1] = 0 # Inverse FFT to reconstruct the signal denoised_series = ifft(fft_vals).real return pd.Series(denoised_series, index=series.index) def outlier_detection(series, path): median = series.median() mad = median_abs_deviation(series) return np.abs(series - median) / mad > 3 def imputations_ts(data, path): data[data.columns] = data[data.columns].fillna(data.mean()) return data ### TIMESERIES PREPROCESSING ### ###--------------------------### ###--------------------------### ### TABULAR PREPROCESSING ### def onehot(data, path): encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore") categorical_columns = data.select_dtypes(include=["object"]).columns.tolist() # Apply one-hot encoding to the categorical columns one_hot_encoded = encoder.fit_transform(data[categorical_columns]).astype(float) one_hot_df = pd.DataFrame( one_hot_encoded, columns=encoder.get_feature_names_out(categorical_columns) ) pickle.dump(encoder, open(path + "/one_hot.sav", "wb")) # Concatenate the one-hot encoded dataframe with the original dataframe df_encoded = pd.concat([data, one_hot_df], axis=1) # Drop the original categorical columns data = df_encoded.drop(categorical_columns, axis=1) return data def imputations(data, class_label, path): imp = SimpleImputer(missing_values=np.nan, strategy="mean") y = data[class_label] data = data.drop([class_label], axis=1) numeric_cols = data.select_dtypes(include=["float64", "int64"]).columns print("Numeric columns ", numeric_cols) data[numeric_cols] = imp.fit_transform(data[numeric_cols]) # Convert back to DataFrame and restore original data types data[numeric_cols] = data[numeric_cols].astype(float) pickle.dump(imp, open(path + "/imp.sav", "wb")) data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False) return data def scaling(data, class_label, path): scaler = StandardScaler() # should not scale binary classes # define standard scaler y = data[class_label] # if class column is numeric do not # apply preprocessing data = data.drop([class_label], axis=1) # transform data cols = data.select_dtypes(np.number).columns # keep non-binary columns nonbinary_columns = [ col for col in cols if not data[col].dropna().isin([0, 1]).all() ] data[nonbinary_columns] = scaler.fit_transform(data[nonbinary_columns]) pickle.dump(scaler, open(path + "/standard_scaler.sav", "wb")) data = pd.concat([y.to_frame(), data], axis=1, ignore_index=False) return data ### TABULAR PREPROCESSING ### ###--------------------------### def decode_cf(df, row, class_label, path, preprocessing_list): cf_row = row.copy() # get actual numerical columns df_numerical = df.select_dtypes(exclude=["object"]).columns.tolist() nonbinary_numeric_columns = [ col for col in df_numerical if not df[col].dropna().isin([0, 1]).all() ] # get actual categorical columns df_categorical = ( df.drop([class_label], axis=1) .select_dtypes(include=["object"]) .columns.tolist() ) if "onehot" in preprocessing_list: ohe = joblib.load(path + "/one_hot.sav") # if there were categorical columns in the dataframe if ohe.get_feature_names_out().size > 0: one_hot_decoded = ohe.inverse_transform(cf_row[ohe.get_feature_names_out()]) cf_categorical_columns = ohe.get_feature_names_out() # Drop the original categorical columns cf_row = cf_row.drop(cf_categorical_columns, axis=1) cf_row[df_categorical] = one_hot_decoded if "std" in preprocessing_list: scaler = joblib.load(path + "/standard_scaler.sav") print(nonbinary_numeric_columns) cf_row[nonbinary_numeric_columns] = scaler.inverse_transform( cf_row[nonbinary_numeric_columns] ) cf_row[class_label] = cf_row[class_label] le = joblib.load(path + "/label_encoder.sav") cf_row[class_label] = le.inverse_transform(cf_row[class_label]) return cf_row def training( data, model, test_size, label, dataset_type, df_name, model_path=None, autoencoder="No", experiment_arguments=None, ): X = data if dataset_type == "tabular": if "id" in data.columns: X = data.drop("id", axis=1) y = X[label] X = X.drop(label, axis=1) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, stratify=y.values, random_state=42 ) if df_name == "stroke": # needs Oversampling ## TODO if class_label is multi class SMOTE needs to have sampling strategy Dict ## TODO check if df needs oversampling oversample = SMOTE(sampling_strategy=0.4, random_state=42) X_sm, y_sm = oversample.fit_resample( X_train, y_train, ) X_train, X_test, y_train, y_test = train_test_split( X_sm, y_sm, test_size=test_size, stratify=y_sm.values, random_state=42 ) if "lr" == model: from sklearn.linear_model import LogisticRegression clf = LogisticRegression(random_state=0).fit(X_train, y_train) y_pred = clf.predict(X_test) filename = "lr.sav" importance = clf.coef_[0] model = clf if "xgb" == model: from xgboost import XGBClassifier # TODO enable_categorical whould be dynamically added # when there are categorical variables in the dataset xgb = XGBClassifier( n_estimators=200, # Number of trees (boosting rounds) learning_rate=0.1, # Step size shrinkage (eta) max_depth=4, # Maximum tree depth min_child_weight=1, # Minimum sum of weights in a child subsample=0.8, # Fraction of samples used per tree colsample_bytree=0.8, # Fraction of features used per tree gamma=0, # Minimum loss reduction to make a split reg_lambda=1, # L2 regularization term (ridge) reg_alpha=0, # L1 regularization term (lasso) objective="binary:logistic", # Binary classification objective use_label_encoder=False, # Avoids unnecessary warnings for older versions eval_metric="logloss", # Logarithmic loss evaluation metric ).fit(X_train, y_train) y_pred = xgb.predict(X_test) filename = "xgb.sav" importance = xgb.feature_importances_ model = xgb if "dt" == model: from sklearn.tree import DecisionTreeClassifier dt = DecisionTreeClassifier(max_depth=30, random_state=42) dt.fit(X_train, y_train) y_pred = dt.predict(X_test) filename = "dt.sav" importance = dt.feature_importances_ model = dt if "svm" == model: from sklearn import svm svc = svm.SVC(kernel="linear", probability=True) svc.fit(X_train, y_train) y_pred = svc.predict(X_test) filename = "svm.sav" importance = svc.coef_[0] model = svc if "rf" == model: from sklearn.ensemble import RandomForestClassifier rf = RandomForestClassifier() rf.fit(X_train, y_train) y_pred = rf.predict(X_test) filename = "rf.sav" importance = rf.feature_importances_ model = rf class_report = classification_report(y_test, y_pred.flatten(), output_dict=True) class_report = pd.DataFrame(class_report) feature_importance = px.bar(x=importance, y=X_train.columns) if model_path: pickle.dump(model, open(model_path + f"/{filename}", "wb")) feature_importance_dict = dict(zip(X_train.columns, importance)) return feature_importance, class_report, feature_importance_dict else: # TODO: add 1dcnn train if model == "glacier": # Split the lr-list string and convert each value to float # experiment_arguments[8] = experiment_arguments[8].rstrip(';') # lr_list = [float(x) for x in experiment_arguments[8].split()] gc_latentcf_search_1dcnn( data, int(experiment_arguments[1]), int(experiment_arguments[2]), model_path + f"/{experiment_arguments[3]}", model_path, autoencoder, ) elif model == "wildboar_knn" or model == "wildboar_rsf": X = data.iloc[:, :-1].values y = data.iloc[:, -1].values X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=1 ) # n_samples, n_timestep = X_train.shape # y_labels, counts = np.unique(y_train, return_counts=True) # print( # f""" # The dataset contains {n_samples} samples with {n_timestep} time steps each. # Of the samples, {counts[0]} is labeled as {y_labels[0]} and {counts[1]} labeled # as {y_labels[1]}. Here, we plot the time series. # """ # ) from wildboar.utils.plot import plot_time_domain if model == "wildboar_knn": from wildboar.distance import KNeighborsClassifier from wildboar.explain.counterfactual import KNeighborsCounterfactual filename = "wildboar_knn.sav" classifier = KNeighborsClassifier( n_neighbors=5, metric="dtw", metric_params={"r": 0.5} ) explainer = KNeighborsCounterfactual(random_state=1, method="auto") if model == "wildboar_rsf": from wildboar.ensemble import ShapeletForestClassifier from wildboar.explain.counterfactual import ShapeletForestCounterfactual filename = "wildboar_rsf.sav" classifier = ShapeletForestClassifier( n_estimators=100, metric="euclidean", max_depth=5, random_state=1, ) explainer = ShapeletForestCounterfactual(random_state=1) classifier.fit(X_train, y_train) # Assuming you have X_test and y_test as test data y_pred = classifier.predict(X_test) # Generate the classification report class_report = classification_report(y_test, y_pred, output_dict=True) # Convert the classification report to a pandas DataFrame class_report = pd.DataFrame(class_report).transpose() X_cf, y_pred, cf_pred = find_counterfactuals(classifier, explainer, X_test) # save x_test, y_test for future use if model_path: x_test_df = pd.DataFrame(X_test) x_test_df.columns = data.iloc[:, :-1].columns x_test_df.to_csv(model_path + "/X_test.csv", index=None) np.save(model_path + "/y_test.npy", y_test) pickle.dump(classifier, open(model_path + f"/{filename}", "wb")) np.save(model_path + "/X_cf.npy", X_cf) np.save(model_path + "/y_pred.npy", y_pred) np.save(model_path + "/cf_pred.npy", cf_pred) return class_report def testing(name, type): data = pd.read_csv(name) y_test = data["diagnosis"] X_test = data.drop("diagnosis", axis=1) if "lr" == type: filename = "lr.sav" clf = joblib.load(filename) y_pred = clf.predict(X_test) importance = clf.coef_[0] model = clf if "xgb" == type: filename = "xgb.sav" xgb = joblib.load(filename) y_pred = xgb.predict(X_test) filename = "xgb.sav" importance = xgb.feature_importances_ model = xgb if "dt" == type: filename = "dt.sav" dt = joblib.load(filename) y_pred = dt.predict(X_test) importance = dt.feature_importances_ model = dt if "svm" == type: filename = "svm.sav" svc = joblib.load(filename) y_pred = svc.predict(X_test) importance = svc.coef_[0] model = svc if "rf" == type: filename = "rf.sav" rf = joblib.load(filename) y_pred = rf.predict(X_test) importance = rf.feature_importances_ model = rf clas_report = classification_report(y_test, y_pred, output_dict=True) clas_report = pd.DataFrame(clas_report).transpose() clas_report = clas_report.sort_values(by=["f1-score"], ascending=False) fig2 = px.bar(x=importance, y=X_test.columns) pickle.dump(model, open(filename, "wb")) con = { "fig2": fig2.to_html(), "clas_report": clas_report, } return con # compute counterfactuals def counterfactuals( query, model, df, class_label, continuous_features, num_counterfactuals=5, features_to_vary=[], ): if "id" in df.columns: df = df.drop("id", axis=1) if "id" in query.columns: query = query.drop("id", axis=1) query = query.drop(class_label, axis=1) # data = df.drop(class_label, axis=1) # continuous_features = df.drop(class_label, axis=1).columns.tolist() # continuous_features = ( # df.drop(class_label, axis=1).select_dtypes(exclude=["object"]).columns.tolist() # ) print(df.dtypes) d = dice_ml.Data( dataframe=df, continuous_features=continuous_features, outcome_name=class_label, ) m = dice_ml.Model(model=model, backend="sklearn") exp = dice_ml.Dice(d, m) if len(features_to_vary) > 0: try: dice_exp = exp.generate_counterfactuals( query, total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple. desired_class="opposite", # We want to convert the quality to the opposite one. features_to_vary=features_to_vary, proximity_weight=0.5, # Control proximity diversity_weight=1.0, # Control diversity sparsity_weight=0.5, # Enforce minimal feature changes random_seed=42, ) except Exception as e: print(e) dice_exp = None else: try: dice_exp = exp.generate_counterfactuals( query, total_CFs=num_counterfactuals, # Total number of Counterfactual Examples we want to print out. There can be multiple. desired_class="opposite", # We want to convert the quality to the opposite one. proximity_weight=0.5, # Control proximity diversity_weight=1.0, # Control diversity sparsity_weight=0.5, # Enforce minimal feature changes random_seed=42, ) except Exception as e: print(e) dice_exp = None if dice_exp: return dice_exp._cf_examples_list return dice_exp def get_dataframe(path): df = pd.read_csv(path) return df def generatePCA(preprocess_df): pca = PCA() pca.fit(preprocess_df) exp_var_cumul = np.cumsum(pca.explained_variance_ratio_) pca = px.area( x=range(1, exp_var_cumul.shape[0] + 1), y=exp_var_cumul, labels={"x": "# Components", "y": "Explained Variance"}, ) pca.update_layout( autosize=True, ) return pca def generateTSNE(preprocess_df, dataset_type, class_label=None): # tSNE tsne = TSNE(n_components=2, random_state=39) if dataset_type == "tabular": projections = tsne.fit_transform(preprocess_df.drop(class_label, axis=1).values) tsne_df = pd.DataFrame( { "0": projections[:, 0], "1": projections[:, 1], class_label: preprocess_df[class_label].astype(str), } ) # render_mode="svg" will prevent the scatter from getting to GL mode # for sufficiently large input. It was observed that for datasets # larger than 1000 entries, the scatter html object that would be # generated would lack the element containing # all the points of the scatter. Using that we could connect the click # of a point with the actual point in the dataset. Thus it was vital # that there exists such an element and also is accessible. # By default, scatter disables it to free space in the client side # and by using render_mode="svg" we avoid that behaviour. # https://github.com/plotly/plotly_express/issues/145 tsne = px.scatter( tsne_df, x="0", y="1", color=class_label, render_mode="svg", ) tsne.update_layout(clickmode="event+select", autosize=True) elif dataset_type == "timeseries": preprocess_df_drop_class = preprocess_df.iloc[:, :-1] projections = tsne.fit_transform(preprocess_df_drop_class) tsne_df = pd.DataFrame( { "0": projections[:, 0], "1": projections[:, 1], "class": preprocess_df.iloc[:, -1].astype(str), } ) # render_mode="svg" will prevent the scatter from getting to GL mode # for sufficiently large input. It was observed that for datasets # larger than 1000 entries, the scatter html object that would be # generated would lack the element containing # all the points of the scatter. Using that we could connect the click # of a point with the actual point in the dataset. Thus it was vital # that there exists such an element and also is accessible. # By default, scatter disables it to free space in the client side # and by using render_mode="svg" we avoid that behaviour. # https://github.com/plotly/plotly_express/issues/145 tsne = px.scatter( tsne_df, x="0", y="1", color="class", render_mode="svg", ) tsne.update_layout(clickmode="event+select", autosize=True) return tsne, projections def generateAugmentedTSNE( df, cf_df, num_counterfactuals, point, tsne_path, class_label ): """ Given a tsne graph, add the traces of the computed counterfactuals for a given point and return the new graph Parameters ---------- df: dataframe used to compute tsne cf_df: counterfactuals dataframe point: original point of for which counterfatuals were computed tsne_path: path to the original tsne plot Returns ------- The tsne graph updated with new counterfactuals points. Cunterfactual points and the point itself are resized """ # make the tsne (the same tsne but with extra points # the counterfactuals points descriped in # counterfactuals.csv) tsne_cf = TSNE(n_components=2, random_state=0) # merge counterfactuals csv with tsne_data df_merged = pd.concat([cf_df, df], ignore_index=True, axis=0) projections = tsne_cf.fit_transform(df_merged.drop(class_label, axis=1).values) # cf_df containes the projection values and the class_label # value of the counterfactual points. Porjections is np array # containing pairs of x and y values for each tsne graph point # cf_df = pd.DataFrame( { "0": projections[:num_counterfactuals, 0], "1": projections[:num_counterfactuals, 1], class_label: cf_df[class_label].iloc[:num_counterfactuals].astype(str), } ) # cf_df = pd.concat([cf_df, point], ignore_index=True, axis=0) # new = {'0':'Front hello', '1': 'hi'} # cf_s.for_each_trace(lambda t: t.update(name = new[t.name])) point_s = px.scatter( point, x="0", y="1", color=class_label, color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"}, render_mode="svg", ) point_s["data"][0]["name"] = "Original data" cf_s = px.scatter( cf_df, x="0", y="1", color=class_label, color_discrete_map={"0": "rgb(239, 85, 59)", "1": "rgb(99, 110, 250)"}, render_mode="svg", ) cf_s["data"][0]["name"] = "Counterfactual" clicked_and_cf_s = go.Figure(data=cf_s.data + point_s.data) clicked_and_cf_s.update_traces( marker=dict(size=10, symbol="circle", line=dict(width=2)) ) tsne = joblib.load(tsne_path) tsne = go.Figure(data=tsne.data + clicked_and_cf_s.data) tsne.update_layout(clickmode="event+select", autosize=True) # tsne.add_trace(cf_s.data[0]) return tsne # tsne_cf = TSNE(n_components=2, random_state=0) # projections = tsne_cf.fit_transform( # df_merged.drop(["diagnosis"], axis=1).values # ) # cf_df = pd.DataFrame( # { # "0": projections[:num_counterfactuals, 0], # "1": projections[:num_counterfactuals, 1], # "diagnosis": cf_df.diagnosis.iloc[:3], # } # ) # cf_df = pd.concat([cf_df, clicked_point_df], ignore_index=True, axis=0) # cf_s = px.scatter( # cf_df, # x="0", # y="1", # color="diagnosis", # color_continuous_scale=px.colors.sequential.Rainbow, # ) # cf_s.update_traces( # marker=dict( # size=10, # symbol="circle", # ) # ) # tsne = joblib.load("tsne.sav") # tsne.add_trace(cf_s.data[0]) # pickle.dump(tsne, open("tsne_cfs.sav", "wb")) # tsne = tsne.to_html() def get_ecg_entry(X_test, y_test, i, class_label): # timeseries # samples subplots # Create subplots fig = go.Figure() y = X_test[y_test == class_label].iloc[i] index = X_test[y_test == class_label].index[i] if class_label == 0: name = "Normal ECG" else: name = "Abnormal ECG" # Adding the ECG trace fig.add_trace( go.Scatter( y=y, mode="lines", line=dict(width=1), ) ) # Updating the layout to reduce margins and make the plot more compact fig.update_layout( xaxis_title="Timestep", yaxis_title=name, hovermode="x", margin=dict(l=10, r=10, t=30, b=10), # Reduced margins ) # Adjust y-axis to start from zero, customize the grid, and add more space fig.update_yaxes( rangemode="tozero", showgrid=True, gridwidth=1, # Makes the gridlines slightly more pronounced tickvals=[min(y), max(y)],) # Addspacing between gridlights return fig, int(index) def ecg_plot_counterfactuals(i, X_test, y_test, y_pred, X_cf, cf_pred): fig = go.Figure() neg = 0 pos = 1 y_test = np.where(y_test == 1, pos, neg) # print("y_test: ", y_test) # print("y_pred: ", y_pred) # print("cf_pred: ", cf_pred) # Original time series fig.add_trace( go.Scatter( y=X_test.iloc[i], mode="lines", name="Original (y_pred = %d, y_actual = %d)" % (y_pred[i], y_test[i]), line=dict(width=0.5), ) ) if len(X_cf[i].shape) > 1: X_cf_flattened = X_cf[i].flatten() # Counterfactual time series fig.add_trace( go.Scatter( y=X_cf_flattened, mode="lines", name="Counterfactual (y = %d)" % cf_pred[i], line=dict(width=1), ) ) else: fig.add_trace( go.Scatter( y=X_cf[i], mode="lines", name="Counterfactual (y = %d)" % cf_pred[i], line=dict(width=1), ) ) # Updating the layout to reduce margins and make the plot more compact fig.update_layout( xaxis_title="Timestep", hovermode="x", margin=dict(l=10, r=10, t=30, b=10), # Reduced margins ) # Adjust y-axis to start from zero, customize the grid, and add more space fig.update_yaxes( rangemode="tozero", showgrid=True, gridwidth=1, # Makes the gridlines slightly more pronounced tickvals=[min(X_test.iloc[i]), max(X_test.iloc[i])],) # Addspacing between gridlights # # Mean time series of the counterfactual class # mean_cf_class = np.mean(X_test.loc[y_test == cf_pred[i]], axis=0) # fig.add_trace(go.Scatter( # y=mean_cf_class, # mode='lines', # name="Mean of X with y = %d" % cf_pred[i], # line=dict(width=1, dash='dash') # )) fig.update_layout( xaxis_title="Timepoints", yaxis_title="Values", legend=dict(x=0.01, y=0.99) ) return fig def get_info_of_dataframe(df): # Creating a DataFrame to store the summary summary_data = { "Total Rows": [df.shape[0]], "Total Columns": [df.shape[1]], "Missing Values (Total)": [df.isnull().sum().sum()], "Missing Values (Columns)": [df.isnull().any(axis=0).sum()], "Categorical Columns": [(df.dtypes == "object").sum()], "Numeric Columns": [df.select_dtypes(include=["number"]).shape[1]], } summary_df = pd.DataFrame(summary_data) # Create a Plotly Table with enhanced styling fig = go.Figure( data=[ go.Table( header=dict( values=["Metric", "Value"], fill_color="#4CAF50", align="left", font=dict(color="white", size=14), height=30, ), cells=dict( values=[summary_df.columns, summary_df.iloc[0].tolist()], fill_color=[["#f9f9f9", "white"] * len(summary_df)], align="left", font=dict(color="black", size=12), height=25, ), ) ] ) fig.update_layout( title_x=0.5, # Center title title_y=0.95, margin=dict(l=20, r=20, t=50, b=20), width=600, height=300, # Adjust height based on the content ) # Convert Plotly figure to HTML return fig.to_html() def update_column_list_with_one_hot_columns(df_original, df_encoded, column_list): updated_columns = [] for column in column_list: # Check if the column is categorical in the original dataset if ( pd.api.types.is_categorical_dtype(df_original[column]) or df_original[column].dtype == "object" ): # The column is categorical, so find the one-hot encoded sub-columns one_hot_columns = [ col for col in df_encoded.columns if col.startswith(f"{column}_") ] # Replace the original column name with the one-hot encoded sub-columns if one_hot_columns: updated_columns.extend( one_hot_columns ) # Add the sub-columns to the updated list else: # If no one-hot encoded columns are found (for some reason), keep the original column updated_columns.append(column) else: # If the column is not categorical, keep it as is updated_columns.append(column) return updated_columns # Function to extract continuous features def get_continuous_features(df): # Filter columns based on dtype and exclude binary columns continuous_columns = df.select_dtypes(include=["float64", "int64"]).columns # Exclude binary features (0 and 1 values) continuous_columns = [ col for col in continuous_columns if df[col].nunique() > 2 # Exclude binary features ] # Return only the continuous features return list(continuous_columns) # Function to extract continuous features def get_categorical_features(df): # Filter columns based on dtype and exclude binary columns categorical_columns = df.select_dtypes(include=["object", "category"]).columns # Return only the continuous features return list(categorical_columns) # Function to extract non-continuous features def get_non_continuous_features(df): # Select numeric columns numeric_columns = df.select_dtypes(include=["float64", "int64"]).columns # Identify binary columns (having only two unique values, like 0/1) binary_columns = [col for col in numeric_columns if df[col].nunique() == 2] # Select non-numeric columns non_numeric_columns = df.select_dtypes( exclude=["float64", "int64"] ).columns.tolist() # Combine binary columns and non-numeric columns non_continuous_columns = binary_columns + non_numeric_columns # Return only the non-continuous features return list(non_continuous_columns) def find_counterfactuals(estimator, explainer, X): y_pred = estimator.predict(X) y_desired = np.empty_like(y_pred) # Store an array of the desired label for each sample. # We assume a binary classification task and the the desired # label is the inverse of the predicted label. a, b = estimator.classes_ y_desired[y_pred == a] = b y_desired[y_pred == b] = a # Initialize the explainer, using the medoid approach. explainer.fit(estimator) # Explain each sample in X as the desired label in y_desired X_cf = explainer.explain(X, y_desired) return X_cf, y_pred, estimator.predict(X_cf) def is_column_categorical_like( df, column_name, unique_threshold=10, ratio_threshold=0.05 ): """ Determines if a numeric column has categorical characteristics. Parameters: df (DataFrame): The DataFrame containing the data. column_name (str): The column name to check. unique_threshold (int): Maximum number of unique values to consider as categorical. ratio_threshold (float): Maximum ratio of unique values to total rows to consider as categorical. Returns: bool: True if the column is likely categorical, False otherwise. """ unique_values = df[column_name].nunique() # Number of unique values total_values = len(df[column_name]) # Total number of rows unique_ratio = unique_values / total_values # Ratio of unique values to total rows # Check if the column is numeric if pd.api.types.is_numeric_dtype(df[column_name]): # Consider it categorical if it has fewer than `unique_threshold` unique values # or if the unique values ratio is below `ratio_threshold` if unique_values <= unique_threshold or unique_ratio <= ratio_threshold: return True return False # Function to flatten the dictionary def flatten_dict(d, parent_key="", sep="_"): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, dict): items.extend(flatten_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) def convert_to_camel_case(s): # Split the string by hyphen parts = s.split("-") # Capitalize each part and join them together camel_case = "".join(word.capitalize() for word in parts) return camel_case # def fetch_line_by_dataset(file_path, dataset, constraint): def fetch_line_by_dataset(file_path, dataset): """ Fetches a line from the file based on the specified dataset name to retrieve basic information about the dataset. :param file_path: Path to the input file. :param dataset: The dataset name to search for. :return: The line matching the dataset and constraint, or None if not found. """ with open(file_path, "r") as file: for line in file: # Strip leading whitespace stripped_line = line.strip() # Skip lines that start with # if stripped_line.startswith("#"): continue # Use regular expressions for exact match of the dataset dataset_pattern = rf"--dataset\s+{re.escape(dataset)}\b" if re.search(dataset_pattern, stripped_line): return stripped_line return None def extract_arguments_from_line(line): """ Extracts all words that come immediately after '--' arguments in the line. :param line: A string containing the line to parse. :return: A list of argument values found in the line. """ # Find all arguments and their values # matches = re.findall(r"(--[\w-]+)((?:\s+[^-][^\s]*)*)", line) matches = re.findall(r"(--[\w-]+)\s+([^\s]+)", line) # Extract argument values arguments = [value.strip() for _, value in matches if value.strip()] return arguments def create_tuple_of_models_text_value(available_pre_trained_models): available_pretrained_models_info = [] for model in available_pre_trained_models: if "xgb" == model: available_pretrained_models_text = "XGBoost" elif "rf" == model: available_pretrained_models_text = "Random Forest" elif "lr" == model: available_pretrained_models_text = "Logistic Regression" elif "dt" == model: available_pretrained_models_text = "Decision Tree" elif "svm" == model: available_pretrained_models_text = "Support Vector Machine" elif "glacier" == model: available_pretrained_models_text = "Glacier 1dCNN" elif "wildboar_knn" == model: available_pretrained_models_text = "Wildboar K-Nearest Neighbours" elif "wildboar_rsf" == model: available_pretrained_models_text = "Wildboar Random Shapelet Forest" available_pretrained_models_info.append( (model, available_pretrained_models_text) ) return available_pretrained_models_info