768 lines
30 KiB
Python
768 lines
30 KiB
Python
import base.pipeline as pipeline
|
|
import pickle, os
|
|
import pandas as pd
|
|
import json
|
|
from sklearn.preprocessing import LabelEncoder
|
|
import joblib
|
|
from dict_and_html import *
|
|
from .. import methods
|
|
from ..methods import PIPELINE_PATH
|
|
import math
|
|
import numpy as np
|
|
from .. glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals
|
|
import base.pipeline as pipeline
|
|
import concurrent.futures
|
|
import json
|
|
from django.shortcuts import HttpResponse
|
|
|
|
def handler(action, request):
|
|
status = 200
|
|
if action == "reset_graph":
|
|
model_name = request.session.get("model_name")
|
|
# dataframe name
|
|
excel_file_name = request.session.get("df_name")
|
|
# save the plots for future use
|
|
# folder path: pipelines/<dataset name>/trained_models/<model_name>/
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{excel_file_name}" + "/trained_models/" + model_name
|
|
)
|
|
|
|
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
|
|
|
|
tsne = joblib.load(model_name_dir_path + "/tsne.sav")
|
|
context = {"fig": tsne.to_html()}
|
|
elif action == "pre_trained":
|
|
# load pre trained models
|
|
pre_trained_model_name = request.POST.get("pre_trained")
|
|
request.session["model_name"] = pre_trained_model_name
|
|
# dataframe name
|
|
df_name = request.session.get("df_name")
|
|
|
|
if df_name == "upload":
|
|
df_name = request.session.get("df_name_upload_base_name")
|
|
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + pre_trained_model_name
|
|
)
|
|
|
|
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
|
|
|
|
# get the type of the file
|
|
datasets_types_PipelineJSON_path = os.path.join(
|
|
PIPELINE_PATH + "/dataset_types_pipeline.json"
|
|
)
|
|
datasets_types_pipeline = pipeline.PipelineJSON(
|
|
datasets_types_PipelineJSON_path
|
|
)
|
|
dataset_type = datasets_types_pipeline.read_from_json([df_name])
|
|
|
|
if type(dataset_type) is list:
|
|
dataset_type = dataset_type[0]
|
|
|
|
if "url" in request.POST:
|
|
url = request.POST.get("url")
|
|
|
|
if url == "counterfactuals":
|
|
# only TSNE
|
|
tsne = joblib.load(model_name_path + "/tsne.sav")
|
|
|
|
# Assuming you already have your fig object created, you can update it like this:
|
|
# Improved and modern t-SNE visualization
|
|
tsne.update_layout(
|
|
# Modern Legend Design
|
|
legend=dict(
|
|
x=0.9,
|
|
y=0.95,
|
|
xanchor="right",
|
|
yanchor="top",
|
|
bgcolor="rgba(255,255,255,0.8)", # Light semi-transparent white background
|
|
bordercolor="rgba(0,0,0,0.1)", # Light border for contrast
|
|
borderwidth=1,
|
|
font=dict(size=12, color="#444"), # Subtle grey for legend text
|
|
),
|
|
# Tight Margins to Focus on the Plot
|
|
margin=dict(
|
|
l=10, r=10, t=30, b=10
|
|
), # Very slim margins for a modern look
|
|
# Axis Design: Minimalist and Clean
|
|
xaxis=dict(
|
|
title_text="", # No axis labels for a clean design
|
|
tickfont=dict(
|
|
size=10, color="#aaa"
|
|
), # Light grey for tick labels
|
|
showline=True,
|
|
linecolor="rgba(0,0,0,0.2)", # Subtle line color for axis lines
|
|
zeroline=False, # No zero line for a sleek look
|
|
showgrid=False, # Hide grid lines for a minimal appearance
|
|
ticks="outside", # Small ticks outside the axis
|
|
ticklen=3, # Short tick marks for subtlety
|
|
),
|
|
yaxis=dict(
|
|
title_text="", # No axis labels
|
|
tickfont=dict(size=10, color="#aaa"),
|
|
showline=True,
|
|
linecolor="rgba(0,0,0,0.2)",
|
|
zeroline=False,
|
|
showgrid=False,
|
|
ticks="outside",
|
|
ticklen=3,
|
|
),
|
|
# Sleek Background
|
|
plot_bgcolor="#fafafa", # Very light grey background for a smooth finish
|
|
paper_bgcolor="#ffffff", # Pure white paper background
|
|
# Modern Title with Elegant Style
|
|
title=dict(
|
|
text="t-SNE Visualization of Data",
|
|
font=dict(
|
|
size=16, color="#222", family="Helvetica, Arial, sans-serif"
|
|
), # Classy font style
|
|
x=0.5,
|
|
xanchor="center",
|
|
yanchor="top",
|
|
pad=dict(t=15), # Padding to separate the title from the plot
|
|
),
|
|
)
|
|
|
|
# Add hover effects for a smooth user experience
|
|
tsne.update_traces(
|
|
hoverinfo="text+name",
|
|
hoverlabel=dict(bgcolor="white", font_size=12, font_family="Arial"),
|
|
)
|
|
|
|
context = {
|
|
"tsne": tsne.to_html(),
|
|
}
|
|
else:
|
|
# load plots
|
|
pca = joblib.load(model_name_path + "/pca.sav")
|
|
classification_report = joblib.load(
|
|
model_name_path + "/classification_report.sav"
|
|
)
|
|
# tsne = joblib.load(model_name_path + "/tsne.sav")
|
|
|
|
# pipeline path
|
|
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
|
|
jsonFile = pipeline.PipelineJSON(json_path)
|
|
|
|
# load pipeline data
|
|
# jsonFile = open(json_path, "r")
|
|
# pipeline_data = json.load(jsonFile) # data becomes a dictionary
|
|
# classifier_data = pipeline_data["classifier"][pre_trained_model_name]
|
|
|
|
classifier_data = jsonFile.read_from_json(
|
|
["classifier", pre_trained_model_name]
|
|
)
|
|
classifier_data_flattened = methods.flatten_dict(classifier_data)
|
|
classifier_data_df = pd.DataFrame([classifier_data_flattened])
|
|
|
|
if dataset_type == "tabular":
|
|
feature_importance = joblib.load(
|
|
model_name_path + "/feature_importance.sav"
|
|
)
|
|
context = {
|
|
"dataset_type": dataset_type,
|
|
"pca": pca.to_html(),
|
|
"class_report": classification_report.to_html(),
|
|
"feature_importance": feature_importance.to_html(),
|
|
"classifier_data": classifier_data_df.to_html(),
|
|
}
|
|
elif dataset_type == "timeseries":
|
|
tsne = joblib.load(model_name_path + "/tsne.sav")
|
|
context = {
|
|
"dataset_type": dataset_type,
|
|
"pca": pca.to_html(),
|
|
"class_report": classification_report.to_html(),
|
|
"tsne": tsne.to_html(),
|
|
"classifier_data": classifier_data_df.to_html(),
|
|
}
|
|
elif action == "click_graph":
|
|
# get df used name
|
|
df_name = request.session.get("df_name")
|
|
if df_name == "upload":
|
|
df_name = request.session.get("df_name_upload_base_name")
|
|
# get model_name
|
|
model_name = request.POST.get("model_name")
|
|
|
|
# preprocessed_path
|
|
excel_file_name_preprocessed_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
|
|
)
|
|
|
|
excel_file_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
|
|
)
|
|
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
|
|
)
|
|
|
|
# pipeline path
|
|
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
|
|
|
|
# load pipeline data
|
|
# jsonFile = open(json_path, "r")
|
|
# pipeline_data = PipelineJSON.load(jsonFile) # data becomes a dictionary
|
|
# class_label = pipeline_data["classifier"][model_name]["class_label"]
|
|
jsonFile = pipeline.PipelineJSON(json_path)
|
|
class_label = jsonFile.read_from_json(
|
|
["classifier", model_name, "class_label"]
|
|
)
|
|
|
|
df = pd.read_csv(excel_file_name_path)
|
|
|
|
# Load your saved feature importance from a .sav file
|
|
feature_importance_df = pd.read_csv(
|
|
model_name_path + "/feature_importance_df.csv"
|
|
)
|
|
# sorted_df = feature_importance_df.sort_values(by="importance", ascending=False)
|
|
|
|
# x and y coordinates of the clicked point in tsne
|
|
x_coord = request.POST["x"]
|
|
y_coord = request.POST["y"]
|
|
|
|
# tsne_projections
|
|
tsne_projections_path = os.path.join(
|
|
PIPELINE_PATH
|
|
+ f"{df_name}/"
|
|
+ f"trained_models/{model_name}"
|
|
+ "/tsne_projections.json",
|
|
)
|
|
|
|
# tsne projections of all points (saved during generation of tsne)
|
|
projections = pd.read_json(tsne_projections_path)
|
|
projections = projections.values.tolist()
|
|
|
|
# projections array is a list of pairs with the (x, y)
|
|
# [ [], [], [] ... ]
|
|
# coordinates for a point in tsne. These are actual absolute
|
|
# coordinates and not SVG.
|
|
# find the pair of the projection with x and y coordinates matching that of
|
|
# clicked point coordinates
|
|
for clicked_id, item in enumerate(projections):
|
|
if math.isclose(item[0], float(x_coord)) and math.isclose(
|
|
item[1], float(y_coord)
|
|
):
|
|
break
|
|
|
|
# save clicked point projections
|
|
request.session["clicked_point"] = item
|
|
# get clicked point row
|
|
row = df.iloc[[int(clicked_id)]]
|
|
request.session["cfrow_id"] = clicked_id
|
|
request.session["cfrow_og"] = row.to_html()
|
|
context = {
|
|
"row": row.to_html(index=False),
|
|
"feature_importance_dict": feature_importance_df.to_dict(orient="records"),
|
|
}
|
|
elif action == "cf":
|
|
# dataframe name
|
|
df_name = request.session.get("df_name")
|
|
if df_name == "upload":
|
|
df_name = request.session.get("df_name_upload_base_name")
|
|
|
|
# preprocessed_path
|
|
excel_file_name_preprocessed_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
|
|
)
|
|
|
|
excel_file_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
|
|
)
|
|
# which model is being used during that session
|
|
model_name = request.POST.get("model_name")
|
|
# path of used model
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
|
|
)
|
|
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
|
|
|
|
# read preprocessed data
|
|
if os.path.exists(excel_file_name_preprocessed_path):
|
|
df = pd.read_csv(excel_file_name_preprocessed_path)
|
|
else:
|
|
df = pd.read_csv(excel_file_name_path)
|
|
|
|
datasets_types_PipelineJSON_path = os.path.join(
|
|
PIPELINE_PATH + "/dataset_types_pipeline.json"
|
|
)
|
|
datasets_types_pipeline = pipeline.PipelineJSON(
|
|
datasets_types_PipelineJSON_path
|
|
)
|
|
dataset_type = datasets_types_pipeline.read_from_json([df_name])
|
|
|
|
if type(dataset_type) is list:
|
|
dataset_type = dataset_type[0]
|
|
|
|
df_id = request.session.get("cfrow_id")
|
|
if dataset_type == "tabular":
|
|
|
|
# get row
|
|
features_to_vary = json.loads(request.POST.get("features_to_vary"))
|
|
|
|
row = df.iloc[[int(df_id)]]
|
|
|
|
# not preprocessed
|
|
notpre_df = pd.read_csv(excel_file_name_path)
|
|
notpre_row = notpre_df.iloc[[int(df_id)]]
|
|
|
|
# if feature_to_vary has a categorical column then I cannot just
|
|
# pass that to dice since the trained model does not contain the
|
|
# categorical column but the one-hot-encoded sub-columns
|
|
features_to_vary = methods.update_column_list_with_one_hot_columns(
|
|
notpre_df, df, features_to_vary
|
|
)
|
|
|
|
# pipeline path
|
|
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
|
|
|
|
# load pipeline data
|
|
jsonFile = pipeline.PipelineJSON(json_path)
|
|
class_label = jsonFile.read_from_json(
|
|
["classifier", model_name, "class_label"]
|
|
) # data becomes a dictionary
|
|
|
|
# number of counterfactuals
|
|
# (TBD) input field value as parameter
|
|
# in ajax
|
|
num_counterfactuals = 5
|
|
le = LabelEncoder()
|
|
notpre_df[class_label] = le.fit_transform(notpre_df[class_label])
|
|
|
|
continuous_features = methods.get_continuous_features(df)
|
|
non_continuous_features = methods.get_non_continuous_features(df)
|
|
|
|
# load used classifier
|
|
clf = joblib.load(model_name_path + model_name + ".sav")
|
|
|
|
try:
|
|
# Set up the executor to run the function in a separate thread
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
# Submit the function to the executor
|
|
future = executor.submit(
|
|
methods.counterfactuals,
|
|
row,
|
|
clf,
|
|
df,
|
|
class_label,
|
|
continuous_features,
|
|
num_counterfactuals,
|
|
features_to_vary,
|
|
)
|
|
# Wait for the result with a timeout of 10 seconds
|
|
counterfactuals = future.result(timeout=10)
|
|
print("Counterfactuals computed successfully!")
|
|
except concurrent.futures.TimeoutError:
|
|
message = (
|
|
"It seems like it took more than expected. Refresh and try again..."
|
|
)
|
|
context = {"message": message}
|
|
|
|
if counterfactuals:
|
|
cf_df = counterfactuals[0].final_cfs_df
|
|
counterfactuals[0].final_cfs_df.to_csv(
|
|
model_name_path + "counterfactuals.csv", index=False
|
|
)
|
|
|
|
# get coordinates of the clicked point (saved during 'click' event)
|
|
clicked_point = request.session.get("clicked_point")
|
|
clicked_point_df = pd.DataFrame(
|
|
{
|
|
"0": clicked_point[0],
|
|
"1": clicked_point[1],
|
|
f"{class_label}": row[class_label].astype(str),
|
|
}
|
|
)
|
|
|
|
# tSNE
|
|
cf_df = pd.read_csv(model_name_path + "counterfactuals.csv")
|
|
model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}")
|
|
tsne_path_to_augment = model_name_path + "tsne.sav"
|
|
|
|
tsne = methods.generateAugmentedTSNE(
|
|
df,
|
|
cf_df,
|
|
num_counterfactuals,
|
|
clicked_point_df,
|
|
tsne_path_to_augment,
|
|
class_label,
|
|
)
|
|
|
|
tsne.update_layout(
|
|
# Modern Legend Design
|
|
legend=dict(
|
|
x=0.85,
|
|
y=0.95,
|
|
xanchor="right",
|
|
yanchor="top",
|
|
bgcolor="rgba(0,0,0,0.05)", # Transparent black background for a sleek look
|
|
bordercolor="rgba(0,0,0,0.1)", # Soft border for separation
|
|
borderwidth=1,
|
|
font=dict(
|
|
size=12, color="#333"
|
|
), # Modern grey font color for text
|
|
),
|
|
# Tight Margins for a Focused Plot Area
|
|
margin=dict(
|
|
l=20, r=20, t=40, b=40
|
|
), # Reduced margins for a cleaner look
|
|
# Axis Titles and Labels: Minimalist Design
|
|
xaxis=dict(
|
|
title_font=dict(
|
|
size=14, color="#555"
|
|
), # Medium grey color for axis title
|
|
tickfont=dict(
|
|
size=11, color="#777"
|
|
), # Light grey color for tick labels
|
|
showline=True,
|
|
linecolor="rgba(0,0,0,0.15)", # Subtle line color for axis lines
|
|
zeroline=False, # Hide the zero line for a cleaner design
|
|
showgrid=False, # No grid lines for a modern look
|
|
),
|
|
yaxis=dict(
|
|
title_font=dict(size=14, color="#555"),
|
|
tickfont=dict(size=11, color="#777"),
|
|
showline=True,
|
|
linecolor="rgba(0,0,0,0.15)",
|
|
zeroline=False,
|
|
showgrid=False,
|
|
),
|
|
# Sleek Background Design
|
|
plot_bgcolor="white", # Crisp white background for a modern touch
|
|
paper_bgcolor="white", # Ensure the entire background is uniform
|
|
# Title: Modern Font and Centered
|
|
title=dict(
|
|
text="t-SNE Visualization of Data",
|
|
font=dict(
|
|
size=18, color="#333", family="Arial, sans-serif"
|
|
), # Modern font style
|
|
x=0.5,
|
|
xanchor="center",
|
|
yanchor="top",
|
|
pad=dict(t=10), # Padding to give the title breathing space
|
|
),
|
|
)
|
|
|
|
pickle.dump(tsne, open(model_name_path + "tsne_cfs.sav", "wb"))
|
|
|
|
context = {
|
|
"dataset_type": dataset_type,
|
|
"model_name": model_name,
|
|
"tsne": tsne.to_html(),
|
|
"num_counterfactuals": num_counterfactuals,
|
|
"default_counterfactual": "1",
|
|
"clicked_point": notpre_row.to_html(),
|
|
"counterfactual": cf_df.iloc[[1]].to_html(),
|
|
}
|
|
|
|
else:
|
|
context = {
|
|
"dataset_type": dataset_type,
|
|
"model_name": model_name,
|
|
"message": "Please try again with different features.",
|
|
}
|
|
elif dataset_type == "timeseries":
|
|
model_name = request.POST["model_name"]
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
|
|
)
|
|
path = model_name_path
|
|
if model_name == "glacier":
|
|
constraint = request.POST["constraint"]
|
|
path = os.path.join(
|
|
PIPELINE_PATH
|
|
+ f"{df_name}/"
|
|
+ "trained_models/"
|
|
+ f"{model_name}/"
|
|
+ f"{constraint}/"
|
|
)
|
|
|
|
X_test_path = os.path.join(model_name_path + "X_test.csv")
|
|
y_test_path = os.path.join(model_name_path + "y_test.npy")
|
|
y_pred_path = os.path.join(path + "y_pred.npy")
|
|
X_cf_path = os.path.join(path + "X_cf.npy")
|
|
cf_pred_path = os.path.join(path + "cf_pred.npy")
|
|
|
|
X_test = pd.read_csv(X_test_path)
|
|
y_test = np.load(y_test_path)
|
|
y_pred = np.load(y_pred_path)
|
|
X_cf = np.load(X_cf_path)
|
|
cf_pred = np.load(cf_pred_path)
|
|
|
|
if model_name != "glacier":
|
|
scaler = joblib.load(model_name_path + "/min_max_scaler.sav")
|
|
X_test = pd.DataFrame(scaler.inverse_transform(X_test))
|
|
X_cf = scaler.inverse_transform(X_cf)
|
|
|
|
fig = methods.ecg_plot_counterfactuals(
|
|
int(df_id), X_test, y_test, y_pred, X_cf, cf_pred
|
|
)
|
|
|
|
context = {
|
|
"df_name": df_name,
|
|
"fig": fig.to_html(),
|
|
"dataset_type": dataset_type,
|
|
}
|
|
elif action == "compute_cf":
|
|
model_name = request.POST.get("model_name")
|
|
if model_name == "glacier":
|
|
constraint_type = request.POST.get("constraint")
|
|
w_value = request.POST.get("w_value")
|
|
df_name = request.session.get("df_name")
|
|
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/"
|
|
)
|
|
model_name_path_constraint = model_name_path + f"{constraint_type}/"
|
|
if not os.path.exists(model_name_path_constraint):
|
|
os.makedirs(model_name_path_constraint)
|
|
|
|
# https://github.com/wildboar-foundation/wildboar/blob/master/docs/guide/explain/counterfactuals.rst#id27
|
|
classifier = joblib.load(model_name_path + "/classifier.sav")
|
|
|
|
# pipeline path
|
|
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
|
|
# load pipeline data
|
|
jsonFile = pipeline.PipelineJSON(json_path)
|
|
autoencoder = jsonFile.read_from_json(
|
|
["classifier", model_name, "autoencoder"]
|
|
)
|
|
|
|
experiment_dict = {"constraint": constraint_type, "w_value": w_value}
|
|
|
|
# if "experiments" in pipeline_data["classifier"][model_name]:
|
|
# # if there exists key with value "experiments"
|
|
# keys = pipeline_data["classifier"][model_name]["experiments"].keys()
|
|
# last_key_int = int(list(keys)[-1])
|
|
# last_key_int_incr_str = str(last_key_int + 1)
|
|
# else:
|
|
# last_key_int_incr_str = "0"
|
|
# experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}}
|
|
# pipeline_data["classifier"][model_name].update(experiment_key_dict)
|
|
|
|
# outter_dict = {last_key_int_incr_str: experiment_dict}
|
|
# pipeline_data["classifier"][model_name]["experiments"].update(outter_dict)
|
|
|
|
if jsonFile.key_exists("experiments"):
|
|
keys = jsonFile.read_from_json(
|
|
["classifier", model_name, "experiments"]
|
|
).keys()
|
|
last_key_int = int(list(keys)[-1])
|
|
last_key_int_incr_str = str(last_key_int + 1)
|
|
else:
|
|
last_key_int_incr_str = "0"
|
|
experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}}
|
|
jsonFile.update_json(
|
|
["classifier", model_name], experiment_key_dict
|
|
)
|
|
|
|
outter_dict = {last_key_int_incr_str: experiment_dict}
|
|
jsonFile.update_json(
|
|
["classifier", model_name, "experiments"], outter_dict
|
|
)
|
|
|
|
if autoencoder == "Yes":
|
|
autoencoder = joblib.load(model_name_path + "/autoencoder.sav")
|
|
else:
|
|
autoencoder = None
|
|
|
|
gc_compute_counterfactuals(
|
|
model_name_path,
|
|
model_name_path_constraint,
|
|
constraint_type,
|
|
[0.0001],
|
|
float(w_value),
|
|
0.5,
|
|
classifier,
|
|
autoencoder,
|
|
)
|
|
path = model_name_path_constraint
|
|
context = {"experiment_dict": experiment_dict}
|
|
elif action == "counterfactual_select":
|
|
|
|
# if <select> element is used, and a specific counterfactual
|
|
# is inquired to be demonstrated:
|
|
df_name = request.session.get("df_name")
|
|
df_name = request.session.get("df_name")
|
|
if df_name == "upload":
|
|
df_name = request.session.get("df_name_upload_base_name")
|
|
|
|
model_name = request.session.get("model_name")
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
|
|
)
|
|
|
|
excel_file_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
|
|
)
|
|
|
|
# pipeline path
|
|
json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json")
|
|
# load pipeline data
|
|
jsonFile = pipeline.PipelineJSON(json_path)
|
|
|
|
class_label = jsonFile.read_from_json(
|
|
["classifier", model_name, "class_label"]
|
|
)
|
|
|
|
# decode counterfactual to original values
|
|
preprocessing_list = jsonFile.read_from_json(
|
|
["classifier", model_name, "preprocessing"]
|
|
)
|
|
|
|
df = pd.read_csv(excel_file_name_path)
|
|
cf_df = pd.read_csv(model_name_path + "/counterfactuals.csv")
|
|
cf_id = request.POST["cf_id"]
|
|
row = cf_df.iloc[[int(cf_id)]]
|
|
|
|
if "id" in df.columns:
|
|
df = df.drop("id", axis=1)
|
|
|
|
dec_row = methods.decode_cf(
|
|
df, row, class_label, model_name_path, preprocessing_list
|
|
)
|
|
|
|
fig = joblib.load(model_name_path + "/tsne_cfs.sav")
|
|
|
|
# tsne stores data for each class in different data[]
|
|
# index.
|
|
# data[0] is class A
|
|
# data[1] is class B
|
|
# ...
|
|
# data[n-2] is counterfactuals
|
|
# data[n-1] is clicked point
|
|
|
|
fig_data_array_length = len(fig.data)
|
|
for i in range(fig_data_array_length - 2):
|
|
fig.data[i].update(
|
|
opacity=0.3,
|
|
)
|
|
|
|
# last one, data[n-1], contains clicked point
|
|
l = fig.data[fig_data_array_length - 1]
|
|
clicked_id = -1
|
|
for clicked_id, item in enumerate(list(zip(l.x, l.y))):
|
|
if math.isclose(
|
|
item[0], request.session.get("clicked_point")[0]
|
|
) and math.isclose(item[1], request.session.get("clicked_point")[1]):
|
|
break
|
|
|
|
# data[n-2] contains counterfactuals
|
|
fig.data[fig_data_array_length - 2].update(
|
|
selectedpoints=[int(cf_id)],
|
|
unselected=dict(
|
|
marker=dict(
|
|
opacity=0.3,
|
|
)
|
|
),
|
|
)
|
|
|
|
fig.data[fig_data_array_length - 1].update(
|
|
selectedpoints=[clicked_id],
|
|
unselected=dict(
|
|
marker=dict(
|
|
opacity=0.3,
|
|
)
|
|
),
|
|
)
|
|
|
|
if "id" in df.columns:
|
|
df = df.drop("id", axis=1)
|
|
|
|
# order the columns
|
|
dec_row = dec_row[df.columns]
|
|
clicked_point_row_id = request.session.get("cfrow_id")
|
|
|
|
# return only the differences
|
|
dec_row = dec_row.reset_index(drop=True)
|
|
df2 = df.iloc[[int(clicked_point_row_id)]].reset_index(drop=True)
|
|
difference = dec_row.loc[
|
|
:,
|
|
[
|
|
methods.compare_values(dec_row[col].iloc[0], df2[col].iloc[0])
|
|
for col in dec_row.columns
|
|
],
|
|
]
|
|
|
|
merged_df = pd.concat([df2[difference.columns], difference], ignore_index=True)
|
|
|
|
context = {
|
|
"row": merged_df.to_html(index=False),
|
|
"fig": fig.to_html(),
|
|
}
|
|
elif action == "class_label_selection":
|
|
|
|
df_name = request.session.get("df_name")
|
|
|
|
if df_name == "upload":
|
|
df_name = request.session["df_name_upload_base_name"]
|
|
|
|
datasets_types_PipelineJSON_path = os.path.join(
|
|
PIPELINE_PATH + "/dataset_types_pipeline.json"
|
|
)
|
|
|
|
dataset_type_json = pipeline.PipelineJSON(datasets_types_PipelineJSON_path)
|
|
|
|
dataset_type = dataset_type_json.read_from_json([df_name])
|
|
|
|
if isinstance(dataset_type, list):
|
|
dataset_type = dataset_type[0]
|
|
|
|
# preprocessed_path
|
|
excel_file_name_preprocessed_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv"
|
|
)
|
|
|
|
excel_file_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv"
|
|
)
|
|
|
|
# which model is being used during that session
|
|
model_name = request.POST.get("model_name")
|
|
|
|
model_name_path = os.path.join(
|
|
PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name
|
|
)
|
|
|
|
X_test_path = os.path.join(
|
|
PIPELINE_PATH
|
|
+ f"{df_name}"
|
|
+ "/trained_models"
|
|
+ f"/{model_name}"
|
|
+ "/X_test.csv"
|
|
)
|
|
y_test_path = os.path.join(
|
|
PIPELINE_PATH
|
|
+ f"{df_name}"
|
|
+ "/trained_models"
|
|
+ f"/{model_name}"
|
|
+ "/y_test.npy"
|
|
)
|
|
|
|
X_test = pd.read_csv(X_test_path)
|
|
y_test = np.load(y_test_path)
|
|
|
|
if model_name != "glacier":
|
|
scaler = joblib.load(model_name_path + "/min_max_scaler.sav")
|
|
X_test = pd.DataFrame(scaler.inverse_transform(X_test))
|
|
|
|
if dataset_type == "timeseries":
|
|
class_label = request.POST.get("class_label")
|
|
cfrow_id = request.POST.get("cfrow_id")
|
|
|
|
class_label = (
|
|
int(class_label)
|
|
if class_label.isdigit()
|
|
else (
|
|
float(class_label)
|
|
if class_label.replace(".", "", 1).isdigit()
|
|
else class_label
|
|
)
|
|
)
|
|
|
|
fig, index = methods.get_ecg_entry(
|
|
X_test, y_test, int(cfrow_id), class_label
|
|
)
|
|
request.session["cfrow_id"] = index
|
|
request.session["class_label"] = class_label
|
|
context = {"fig": fig.to_html(), "dataset_type": dataset_type}
|
|
return HttpResponse(json.dumps(context), status=status) |