EXTREMUM_web/base/handlers/ajaxCounterfactualsHandler.py

768 lines
30 KiB
Python

import base.pipeline as pipeline
import pickle, os
import pandas as pd
import json
from sklearn.preprocessing import LabelEncoder
import joblib
from dict_and_html import *
from .. import methods
from ..methods import PIPELINE_PATH
import math
import numpy as np
from .. glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
import base.pipeline as pipeline
import concurrent.futures
import json
from django.shortcuts import HttpResponse
def handler(action, request):
status = 200
if action == "reset_graph":
model_name = request.session.get("model_name")
# dataframe name
excel_file_name = request.session.get("df_name")
# save the plots for future use
# folder path: pipelines/<dataset name>/trained_models/<model_name>/
model_name_path = os.path.join(
PIPELINE_PATH + f"{excel_file_name}" + "/trained_models/" + model_name
)
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
tsne = joblib.load(model_name_dir_path + "/tsne.sav")
context = {"fig": tsne.to_html()}
elif action == "pre_trained":
# load pre trained models
pre_trained_model_name = request.POST.get("pre_trained")
request.session["model_name"] = pre_trained_model_name
# dataframe name
df_name = request.session.get("df_name")
if df_name == "upload":
df_name = request.session.get("df_name_upload_base_name")
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + pre_trained_model_name
)
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
# get the type of the file
datasets_types_PipelineJSON_path = os.path.join(
PIPELINE_PATH + "/dataset_types_pipeline.json"
)
datasets_types_pipeline = pipeline.PipelineJSON(
datasets_types_PipelineJSON_path
)
dataset_type = datasets_types_pipeline.read_from_json([df_name])
if type(dataset_type) is list:
dataset_type = dataset_type[0]
if "url" in request.POST:
url = request.POST.get("url")
if url == "counterfactuals":
# only TSNE
tsne = joblib.load(model_name_path + "/tsne.sav")
# Assuming you already have your fig object created, you can update it like this:
# Improved and modern t-SNE visualization
tsne.update_layout(
# Modern Legend Design
legend=dict(
x=0.9,
y=0.95,
xanchor="right",
yanchor="top",
bgcolor="rgba(255,255,255,0.8)", # Light semi-transparent white background
bordercolor="rgba(0,0,0,0.1)", # Light border for contrast
borderwidth=1,
font=dict(size=12, color="#444"), # Subtle grey for legend text
),
# Tight Margins to Focus on the Plot
margin=dict(
l=10, r=10, t=30, b=10
), # Very slim margins for a modern look
# Axis Design: Minimalist and Clean
xaxis=dict(
title_text="", # No axis labels for a clean design
tickfont=dict(
size=10, color="#aaa"
), # Light grey for tick labels
showline=True,
linecolor="rgba(0,0,0,0.2)", # Subtle line color for axis lines
zeroline=False, # No zero line for a sleek look
showgrid=False, # Hide grid lines for a minimal appearance
ticks="outside", # Small ticks outside the axis
ticklen=3, # Short tick marks for subtlety
),
yaxis=dict(
title_text="", # No axis labels
tickfont=dict(size=10, color="#aaa"),
showline=True,
linecolor="rgba(0,0,0,0.2)",
zeroline=False,
showgrid=False,
ticks="outside",
ticklen=3,
),
# Sleek Background
plot_bgcolor="#fafafa", # Very light grey background for a smooth finish
paper_bgcolor="#ffffff", # Pure white paper background
# Modern Title with Elegant Style
title=dict(
text="t-SNE Visualization of Data",
font=dict(
size=16, color="#222", family="Helvetica, Arial, sans-serif"
), # Classy font style
x=0.5,
xanchor="center",
yanchor="top",
pad=dict(t=15), # Padding to separate the title from the plot
),
)
# Add hover effects for a smooth user experience
tsne.update_traces(
hoverinfo="text+name",
hoverlabel=dict(bgcolor="white", font_size=12, font_family="Arial"),
)
context = {
"tsne": tsne.to_html(),
}
else:
# load plots
pca = joblib.load(model_name_path + "/pca.sav")
classification_report = joblib.load(
model_name_path + "/classification_report.sav"
)
# tsne = joblib.load(model_name_path + "/tsne.sav")
# pipeline path
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
jsonFile = pipeline.PipelineJSON(json_path)
# load pipeline data
# jsonFile = open(json_path, "r")
# pipeline_data = json.load(jsonFile) # data becomes a dictionary
# classifier_data = pipeline_data["classifier"][pre_trained_model_name]
classifier_data = jsonFile.read_from_json(
["classifier", pre_trained_model_name]
)
classifier_data_flattened = methods.flatten_dict(classifier_data)
classifier_data_df = pd.DataFrame([classifier_data_flattened])
if dataset_type == "tabular":
feature_importance = joblib.load(
model_name_path + "/feature_importance.sav"
)
context = {
"dataset_type": dataset_type,
"pca": pca.to_html(),
"class_report": classification_report.to_html(),
"feature_importance": feature_importance.to_html(),
"classifier_data": classifier_data_df.to_html(),
}
elif dataset_type == "timeseries":
tsne = joblib.load(model_name_path + "/tsne.sav")
context = {
"dataset_type": dataset_type,
"pca": pca.to_html(),
"class_report": classification_report.to_html(),
"tsne": tsne.to_html(),
"classifier_data": classifier_data_df.to_html(),
}
elif action == "click_graph":
# get df used name
df_name = request.session.get("df_name")
if df_name == "upload":
df_name = request.session.get("df_name_upload_base_name")
# get model_name
model_name = request.POST.get("model_name")
# preprocessed_path
excel_file_name_preprocessed_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
)
excel_file_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
)
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
)
# pipeline path
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
# load pipeline data
# jsonFile = open(json_path, "r")
# pipeline_data = PipelineJSON.load(jsonFile) # data becomes a dictionary
# class_label = pipeline_data["classifier"][model_name]["class_label"]
jsonFile = pipeline.PipelineJSON(json_path)
class_label = jsonFile.read_from_json(
["classifier", model_name, "class_label"]
)
df = pd.read_csv(excel_file_name_path)
# Load your saved feature importance from a .sav file
feature_importance_df = pd.read_csv(
model_name_path + "/feature_importance_df.csv"
)
# sorted_df = feature_importance_df.sort_values(by="importance", ascending=False)
# x and y coordinates of the clicked point in tsne
x_coord = request.POST["x"]
y_coord = request.POST["y"]
# tsne_projections
tsne_projections_path = os.path.join(
PIPELINE_PATH
+ f"{df_name}/"
+ f"trained_models/{model_name}"
+ "/tsne_projections.json",
)
# tsne projections of all points (saved during generation of tsne)
projections = pd.read_json(tsne_projections_path)
projections = projections.values.tolist()
# projections array is a list of pairs with the (x, y)
# [ [], [], [] ... ]
# coordinates for a point in tsne. These are actual absolute
# coordinates and not SVG.
# find the pair of the projection with x and y coordinates matching that of
# clicked point coordinates
for clicked_id, item in enumerate(projections):
if math.isclose(item[0], float(x_coord)) and math.isclose(
item[1], float(y_coord)
):
break
# save clicked point projections
request.session["clicked_point"] = item
# get clicked point row
row = df.iloc[[int(clicked_id)]]
request.session["cfrow_id"] = clicked_id
request.session["cfrow_og"] = row.to_html()
context = {
"row": row.to_html(index=False),
"feature_importance_dict": feature_importance_df.to_dict(orient="records"),
}
elif action == "cf":
# dataframe name
df_name = request.session.get("df_name")
if df_name == "upload":
df_name = request.session.get("df_name_upload_base_name")
# preprocessed_path
excel_file_name_preprocessed_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
)
excel_file_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
)
# which model is being used during that session
model_name = request.POST.get("model_name")
# path of used model
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
)
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
# read preprocessed data
if os.path.exists(excel_file_name_preprocessed_path):
df = pd.read_csv(excel_file_name_preprocessed_path)
else:
df = pd.read_csv(excel_file_name_path)
datasets_types_PipelineJSON_path = os.path.join(
PIPELINE_PATH + "/dataset_types_pipeline.json"
)
datasets_types_pipeline = pipeline.PipelineJSON(
datasets_types_PipelineJSON_path
)
dataset_type = datasets_types_pipeline.read_from_json([df_name])
if type(dataset_type) is list:
dataset_type = dataset_type[0]
df_id = request.session.get("cfrow_id")
if dataset_type == "tabular":
# get row
features_to_vary = json.loads(request.POST.get("features_to_vary"))
row = df.iloc[[int(df_id)]]
# not preprocessed
notpre_df = pd.read_csv(excel_file_name_path)
notpre_row = notpre_df.iloc[[int(df_id)]]
# if feature_to_vary has a categorical column then I cannot just
# pass that to dice since the trained model does not contain the
# categorical column but the one-hot-encoded sub-columns
features_to_vary = methods.update_column_list_with_one_hot_columns(
notpre_df, df, features_to_vary
)
# pipeline path
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
# load pipeline data
jsonFile = pipeline.PipelineJSON(json_path)
class_label = jsonFile.read_from_json(
["classifier", model_name, "class_label"]
) # data becomes a dictionary
# number of counterfactuals
# (TBD) input field value as parameter
# in ajax
num_counterfactuals = 5
le = LabelEncoder()
notpre_df[class_label] = le.fit_transform(notpre_df[class_label])
continuous_features = methods.get_continuous_features(df)
non_continuous_features = methods.get_non_continuous_features(df)
# load used classifier
clf = joblib.load(model_name_path + model_name + ".sav")
try:
# Set up the executor to run the function in a separate thread
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit the function to the executor
future = executor.submit(
methods.counterfactuals,
row,
clf,
df,
class_label,
continuous_features,
num_counterfactuals,
features_to_vary,
)
# Wait for the result with a timeout of 10 seconds
counterfactuals = future.result(timeout=10)
print("Counterfactuals computed successfully!")
except concurrent.futures.TimeoutError:
message = (
"It seems like it took more than expected. Refresh and try again..."
)
context = {"message": message}
if counterfactuals:
cf_df = counterfactuals[0].final_cfs_df
counterfactuals[0].final_cfs_df.to_csv(
model_name_path + "counterfactuals.csv", index=False
)
# get coordinates of the clicked point (saved during 'click' event)
clicked_point = request.session.get("clicked_point")
clicked_point_df = pd.DataFrame(
{
"0": clicked_point[0],
"1": clicked_point[1],
f"{class_label}": row[class_label].astype(str),
}
)
# tSNE
cf_df = pd.read_csv(model_name_path + "counterfactuals.csv")
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
tsne_path_to_augment = model_name_path + "tsne.sav"
tsne = methods.generateAugmentedTSNE(
df,
cf_df,
num_counterfactuals,
clicked_point_df,
tsne_path_to_augment,
class_label,
)
tsne.update_layout(
# Modern Legend Design
legend=dict(
x=0.85,
y=0.95,
xanchor="right",
yanchor="top",
bgcolor="rgba(0,0,0,0.05)", # Transparent black background for a sleek look
bordercolor="rgba(0,0,0,0.1)", # Soft border for separation
borderwidth=1,
font=dict(
size=12, color="#333"
), # Modern grey font color for text
),
# Tight Margins for a Focused Plot Area
margin=dict(
l=20, r=20, t=40, b=40
), # Reduced margins for a cleaner look
# Axis Titles and Labels: Minimalist Design
xaxis=dict(
title_font=dict(
size=14, color="#555"
), # Medium grey color for axis title
tickfont=dict(
size=11, color="#777"
), # Light grey color for tick labels
showline=True,
linecolor="rgba(0,0,0,0.15)", # Subtle line color for axis lines
zeroline=False, # Hide the zero line for a cleaner design
showgrid=False, # No grid lines for a modern look
),
yaxis=dict(
title_font=dict(size=14, color="#555"),
tickfont=dict(size=11, color="#777"),
showline=True,
linecolor="rgba(0,0,0,0.15)",
zeroline=False,
showgrid=False,
),
# Sleek Background Design
plot_bgcolor="white", # Crisp white background for a modern touch
paper_bgcolor="white", # Ensure the entire background is uniform
# Title: Modern Font and Centered
title=dict(
text="t-SNE Visualization of Data",
font=dict(
size=18, color="#333", family="Arial, sans-serif"
), # Modern font style
x=0.5,
xanchor="center",
yanchor="top",
pad=dict(t=10), # Padding to give the title breathing space
),
)
pickle.dump(tsne, open(model_name_path + "tsne_cfs.sav", "wb"))
context = {
"dataset_type": dataset_type,
"model_name": model_name,
"tsne": tsne.to_html(),
"num_counterfactuals": num_counterfactuals,
"default_counterfactual": "1",
"clicked_point": notpre_row.to_html(),
"counterfactual": cf_df.iloc[[1]].to_html(),
}
else:
context = {
"dataset_type": dataset_type,
"model_name": model_name,
"message": "Please try again with different features.",
}
elif dataset_type == "timeseries":
model_name = request.POST["model_name"]
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
)
path = model_name_path
if model_name == "glacier":
constraint = request.POST["constraint"]
path = os.path.join(
PIPELINE_PATH
+ f"{df_name}/"
+ "trained_models/"
+ f"{model_name}/"
+ f"{constraint}/"
)
X_test_path = os.path.join(model_name_path + "X_test.csv")
y_test_path = os.path.join(model_name_path + "y_test.npy")
y_pred_path = os.path.join(path + "y_pred.npy")
X_cf_path = os.path.join(path + "X_cf.npy")
cf_pred_path = os.path.join(path + "cf_pred.npy")
X_test = pd.read_csv(X_test_path)
y_test = np.load(y_test_path)
y_pred = np.load(y_pred_path)
X_cf = np.load(X_cf_path)
cf_pred = np.load(cf_pred_path)
if model_name != "glacier":
scaler = joblib.load(model_name_path + "/min_max_scaler.sav")
X_test = pd.DataFrame(scaler.inverse_transform(X_test))
X_cf = scaler.inverse_transform(X_cf)
fig = methods.ecg_plot_counterfactuals(
int(df_id), X_test, y_test, y_pred, X_cf, cf_pred
)
context = {
"df_name": df_name,
"fig": fig.to_html(),
"dataset_type": dataset_type,
}
elif action == "compute_cf":
model_name = request.POST.get("model_name")
if model_name == "glacier":
constraint_type = request.POST.get("constraint")
w_value = request.POST.get("w_value")
df_name = request.session.get("df_name")
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
)
model_name_path_constraint = model_name_path + f"{constraint_type}/"
if not os.path.exists(model_name_path_constraint):
os.makedirs(model_name_path_constraint)
# https://github.com/wildboar-foundation/wildboar/blob/master/docs/guide/explain/counterfactuals.rst#id27
classifier = joblib.load(model_name_path + "/classifier.sav")
# pipeline path
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
# load pipeline data
jsonFile = pipeline.PipelineJSON(json_path)
autoencoder = jsonFile.read_from_json(
["classifier", model_name, "autoencoder"]
)
experiment_dict = {"constraint": constraint_type, "w_value": w_value}
# if "experiments" in pipeline_data["classifier"][model_name]:
# # if there exists key with value "experiments"
# keys = pipeline_data["classifier"][model_name]["experiments"].keys()
# last_key_int = int(list(keys)[-1])
# last_key_int_incr_str = str(last_key_int + 1)
# else:
# last_key_int_incr_str = "0"
# experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}}
# pipeline_data["classifier"][model_name].update(experiment_key_dict)
# outter_dict = {last_key_int_incr_str: experiment_dict}
# pipeline_data["classifier"][model_name]["experiments"].update(outter_dict)
if jsonFile.key_exists("experiments"):
keys = jsonFile.read_from_json(
["classifier", model_name, "experiments"]
).keys()
last_key_int = int(list(keys)[-1])
last_key_int_incr_str = str(last_key_int + 1)
else:
last_key_int_incr_str = "0"
experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}}
jsonFile.update_json(
["classifier", model_name], experiment_key_dict
)
outter_dict = {last_key_int_incr_str: experiment_dict}
jsonFile.update_json(
["classifier", model_name, "experiments"], outter_dict
)
if autoencoder == "Yes":
autoencoder = joblib.load(model_name_path + "/autoencoder.sav")
else:
autoencoder = None
gc_compute_counterfactuals(
model_name_path,
model_name_path_constraint,
constraint_type,
[0.0001],
float(w_value),
0.5,
classifier,
autoencoder,
)
path = model_name_path_constraint
context = {"experiment_dict": experiment_dict}
elif action == "counterfactual_select":
# if <select> element is used, and a specific counterfactual
# is inquired to be demonstrated:
df_name = request.session.get("df_name")
df_name = request.session.get("df_name")
if df_name == "upload":
df_name = request.session.get("df_name_upload_base_name")
model_name = request.session.get("model_name")
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
)
excel_file_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
)
# pipeline path
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
# load pipeline data
jsonFile = pipeline.PipelineJSON(json_path)
class_label = jsonFile.read_from_json(
["classifier", model_name, "class_label"]
)
# decode counterfactual to original values
preprocessing_list = jsonFile.read_from_json(
["classifier", model_name, "preprocessing"]
)
df = pd.read_csv(excel_file_name_path)
cf_df = pd.read_csv(model_name_path + "/counterfactuals.csv")
cf_id = request.POST["cf_id"]
row = cf_df.iloc[[int(cf_id)]]
if "id" in df.columns:
df = df.drop("id", axis=1)
dec_row = methods.decode_cf(
df, row, class_label, model_name_path, preprocessing_list
)
fig = joblib.load(model_name_path + "/tsne_cfs.sav")
# tsne stores data for each class in different data[]
# index.
# data[0] is class A
# data[1] is class B
# ...
# data[n-2] is counterfactuals
# data[n-1] is clicked point
fig_data_array_length = len(fig.data)
for i in range(fig_data_array_length - 2):
fig.data[i].update(
opacity=0.3,
)
# last one, data[n-1], contains clicked point
l = fig.data[fig_data_array_length - 1]
clicked_id = -1
for clicked_id, item in enumerate(list(zip(l.x, l.y))):
if math.isclose(
item[0], request.session.get("clicked_point")[0]
) and math.isclose(item[1], request.session.get("clicked_point")[1]):
break
# data[n-2] contains counterfactuals
fig.data[fig_data_array_length - 2].update(
selectedpoints=[int(cf_id)],
unselected=dict(
marker=dict(
opacity=0.3,
)
),
)
fig.data[fig_data_array_length - 1].update(
selectedpoints=[clicked_id],
unselected=dict(
marker=dict(
opacity=0.3,
)
),
)
if "id" in df.columns:
df = df.drop("id", axis=1)
# order the columns
dec_row = dec_row[df.columns]
clicked_point_row_id = request.session.get("cfrow_id")
# return only the differences
dec_row = dec_row.reset_index(drop=True)
df2 = df.iloc[[int(clicked_point_row_id)]].reset_index(drop=True)
difference = dec_row.loc[
:,
[
methods.compare_values(dec_row[col].iloc[0], df2[col].iloc[0])
for col in dec_row.columns
],
]
merged_df = pd.concat([df2[difference.columns], difference], ignore_index=True)
context = {
"row": merged_df.to_html(index=False),
"fig": fig.to_html(),
}
elif action == "class_label_selection":
df_name = request.session.get("df_name")
if df_name == "upload":
df_name = request.session["df_name_upload_base_name"]
datasets_types_PipelineJSON_path = os.path.join(
PIPELINE_PATH + "/dataset_types_pipeline.json"
)
dataset_type_json = pipeline.PipelineJSON(datasets_types_PipelineJSON_path)
dataset_type = dataset_type_json.read_from_json([df_name])
if isinstance(dataset_type, list):
dataset_type = dataset_type[0]
# preprocessed_path
excel_file_name_preprocessed_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
)
excel_file_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
)
# which model is being used during that session
model_name = request.POST.get("model_name")
model_name_path = os.path.join(
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
)
X_test_path = os.path.join(
PIPELINE_PATH
+ f"{df_name}"
+ "/trained_models"
+ f"/{model_name}"
+ "/X_test.csv"
)
y_test_path = os.path.join(
PIPELINE_PATH
+ f"{df_name}"
+ "/trained_models"
+ f"/{model_name}"
+ "/y_test.npy"
)
X_test = pd.read_csv(X_test_path)
y_test = np.load(y_test_path)
if model_name != "glacier":
scaler = joblib.load(model_name_path + "/min_max_scaler.sav")
X_test = pd.DataFrame(scaler.inverse_transform(X_test))
if dataset_type == "timeseries":
class_label = request.POST.get("class_label")
cfrow_id = request.POST.get("cfrow_id")
class_label = (
int(class_label)
if class_label.isdigit()
else (
float(class_label)
if class_label.replace(".", "", 1).isdigit()
else class_label
)
)
fig, index = methods.get_ecg_entry(
X_test, y_test, int(cfrow_id), class_label
)
request.session["cfrow_id"] = index
request.session["class_label"] = class_label
context = {"fig": fig.to_html(), "dataset_type": dataset_type}
return HttpResponse(json.dumps(context), status=status)