import base.pipeline as pipeline import pickle, os import pandas as pd import json from sklearn.preprocessing import LabelEncoder import joblib from dict_and_html import * from .. import methods from ..methods import PIPELINE_PATH import math import numpy as np from .. glacier.src.glacier_compute_counterfactuals import gc_compute_counterfactuals import base.pipeline as pipeline import concurrent.futures import json from django.shortcuts import HttpResponse def handler(action, request): status = 200 if action == "reset_graph": model_name = request.session.get("model_name") # dataframe name excel_file_name = request.session.get("df_name") # save the plots for future use # folder path: pipelines//trained_models// model_name_path = os.path.join( PIPELINE_PATH + f"{excel_file_name}" + "/trained_models/" + model_name ) model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}") tsne = joblib.load(model_name_dir_path + "/tsne.sav") context = {"fig": tsne.to_html()} elif action == "pre_trained": # load pre trained models pre_trained_model_name = request.POST.get("pre_trained") request.session["model_name"] = pre_trained_model_name # dataframe name df_name = request.session.get("df_name") if df_name == "upload": df_name = request.session.get("df_name_upload_base_name") model_name_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/trained_models/" + pre_trained_model_name ) model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}") # get the type of the file datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) datasets_types_pipeline = pipeline.PipelineJSON( datasets_types_PipelineJSON_path ) dataset_type = datasets_types_pipeline.read_from_json([df_name]) if type(dataset_type) is list: dataset_type = dataset_type[0] if "url" in request.POST: url = request.POST.get("url") if url == "counterfactuals": # only TSNE tsne = joblib.load(model_name_path + "/tsne.sav") # Assuming you already have your fig object created, you can update it like this: # Improved and modern t-SNE visualization tsne.update_layout( # Modern Legend Design legend=dict( x=0.9, y=0.95, xanchor="right", yanchor="top", bgcolor="rgba(255,255,255,0.8)", # Light semi-transparent white background bordercolor="rgba(0,0,0,0.1)", # Light border for contrast borderwidth=1, font=dict(size=12, color="#444"), # Subtle grey for legend text ), # Tight Margins to Focus on the Plot margin=dict( l=10, r=10, t=30, b=10 ), # Very slim margins for a modern look # Axis Design: Minimalist and Clean xaxis=dict( title_text="", # No axis labels for a clean design tickfont=dict( size=10, color="#aaa" ), # Light grey for tick labels showline=True, linecolor="rgba(0,0,0,0.2)", # Subtle line color for axis lines zeroline=False, # No zero line for a sleek look showgrid=False, # Hide grid lines for a minimal appearance ticks="outside", # Small ticks outside the axis ticklen=3, # Short tick marks for subtlety ), yaxis=dict( title_text="", # No axis labels tickfont=dict(size=10, color="#aaa"), showline=True, linecolor="rgba(0,0,0,0.2)", zeroline=False, showgrid=False, ticks="outside", ticklen=3, ), # Sleek Background plot_bgcolor="#fafafa", # Very light grey background for a smooth finish paper_bgcolor="#ffffff", # Pure white paper background # Modern Title with Elegant Style title=dict( text="t-SNE Visualization of Data", font=dict( size=16, color="#222", family="Helvetica, Arial, sans-serif" ), # Classy font style x=0.5, xanchor="center", yanchor="top", pad=dict(t=15), # Padding to separate the title from the plot ), ) # Add hover effects for a smooth user experience tsne.update_traces( hoverinfo="text+name", hoverlabel=dict(bgcolor="white", font_size=12, font_family="Arial"), ) context = { "tsne": tsne.to_html(), } else: # load plots pca = joblib.load(model_name_path + "/pca.sav") classification_report = joblib.load( model_name_path + "/classification_report.sav" ) # tsne = joblib.load(model_name_path + "/tsne.sav") # pipeline path json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json") jsonFile = pipeline.PipelineJSON(json_path) # load pipeline data # jsonFile = open(json_path, "r") # pipeline_data = json.load(jsonFile) # data becomes a dictionary # classifier_data = pipeline_data["classifier"][pre_trained_model_name] classifier_data = jsonFile.read_from_json( ["classifier", pre_trained_model_name] ) classifier_data_flattened = methods.flatten_dict(classifier_data) classifier_data_df = pd.DataFrame([classifier_data_flattened]) if dataset_type == "tabular": feature_importance = joblib.load( model_name_path + "/feature_importance.sav" ) context = { "dataset_type": dataset_type, "pca": pca.to_html(), "class_report": classification_report.to_html(), "feature_importance": feature_importance.to_html(), "classifier_data": classifier_data_df.to_html(), } elif dataset_type == "timeseries": tsne = joblib.load(model_name_path + "/tsne.sav") context = { "dataset_type": dataset_type, "pca": pca.to_html(), "class_report": classification_report.to_html(), "tsne": tsne.to_html(), "classifier_data": classifier_data_df.to_html(), } elif action == "click_graph": # get df used name df_name = request.session.get("df_name") if df_name == "upload": df_name = request.session.get("df_name_upload_base_name") # get model_name model_name = request.POST.get("model_name") # preprocessed_path excel_file_name_preprocessed_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv" ) excel_file_name_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv" ) model_name_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/trained_models/" + model_name ) # pipeline path json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json") # load pipeline data # jsonFile = open(json_path, "r") # pipeline_data = PipelineJSON.load(jsonFile) # data becomes a dictionary # class_label = pipeline_data["classifier"][model_name]["class_label"] jsonFile = pipeline.PipelineJSON(json_path) class_label = jsonFile.read_from_json( ["classifier", model_name, "class_label"] ) df = pd.read_csv(excel_file_name_path) # Load your saved feature importance from a .sav file feature_importance_df = pd.read_csv( model_name_path + "/feature_importance_df.csv" ) # sorted_df = feature_importance_df.sort_values(by="importance", ascending=False) # x and y coordinates of the clicked point in tsne x_coord = request.POST["x"] y_coord = request.POST["y"] # tsne_projections tsne_projections_path = os.path.join( PIPELINE_PATH + f"{df_name}/" + f"trained_models/{model_name}" + "/tsne_projections.json", ) # tsne projections of all points (saved during generation of tsne) projections = pd.read_json(tsne_projections_path) projections = projections.values.tolist() # projections array is a list of pairs with the (x, y) # [ [], [], [] ... ] # coordinates for a point in tsne. These are actual absolute # coordinates and not SVG. # find the pair of the projection with x and y coordinates matching that of # clicked point coordinates for clicked_id, item in enumerate(projections): if math.isclose(item[0], float(x_coord)) and math.isclose( item[1], float(y_coord) ): break # save clicked point projections request.session["clicked_point"] = item # get clicked point row row = df.iloc[[int(clicked_id)]] request.session["cfrow_id"] = clicked_id request.session["cfrow_og"] = row.to_html() context = { "row": row.to_html(index=False), "feature_importance_dict": feature_importance_df.to_dict(orient="records"), } elif action == "cf": # dataframe name df_name = request.session.get("df_name") if df_name == "upload": df_name = request.session.get("df_name_upload_base_name") # preprocessed_path excel_file_name_preprocessed_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/" + df_name + "_preprocessed" + ".csv" ) excel_file_name_path = os.path.join( PIPELINE_PATH + f"{df_name}" + "/" + df_name + ".csv" ) # which model is being used during that session model_name = request.POST.get("model_name") # path of used model model_name_path = os.path.join( PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/" ) model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}") # read preprocessed data if os.path.exists(excel_file_name_preprocessed_path): df = pd.read_csv(excel_file_name_preprocessed_path) else: df = pd.read_csv(excel_file_name_path) datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) datasets_types_pipeline = pipeline.PipelineJSON( datasets_types_PipelineJSON_path ) dataset_type = datasets_types_pipeline.read_from_json([df_name]) if type(dataset_type) is list: dataset_type = dataset_type[0] df_id = request.session.get("cfrow_id") if dataset_type == "tabular": # get row features_to_vary = json.loads(request.POST.get("features_to_vary")) row = df.iloc[[int(df_id)]] # not preprocessed notpre_df = pd.read_csv(excel_file_name_path) notpre_row = notpre_df.iloc[[int(df_id)]] # if feature_to_vary has a categorical column then I cannot just # pass that to dice since the trained model does not contain the # categorical column but the one-hot-encoded sub-columns features_to_vary = methods.update_column_list_with_one_hot_columns( notpre_df, df, features_to_vary ) # pipeline path json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json") # load pipeline data jsonFile = pipeline.PipelineJSON(json_path) class_label = jsonFile.read_from_json( ["classifier", model_name, "class_label"] ) # data becomes a dictionary # number of counterfactuals # (TBD) input field value as parameter # in ajax num_counterfactuals = 5 le = LabelEncoder() notpre_df[class_label] = le.fit_transform(notpre_df[class_label]) continuous_features = methods.get_continuous_features(df) non_continuous_features = methods.get_non_continuous_features(df) # load used classifier clf = joblib.load(model_name_path + model_name + ".sav") try: # Set up the executor to run the function in a separate thread with concurrent.futures.ThreadPoolExecutor() as executor: # Submit the function to the executor future = executor.submit( methods.counterfactuals, row, clf, df, class_label, continuous_features, num_counterfactuals, features_to_vary, ) # Wait for the result with a timeout of 10 seconds counterfactuals = future.result(timeout=10) print("Counterfactuals computed successfully!") except concurrent.futures.TimeoutError: message = ( "It seems like it took more than expected. Refresh and try again..." ) context = {"message": message} if counterfactuals: cf_df = counterfactuals[0].final_cfs_df counterfactuals[0].final_cfs_df.to_csv( model_name_path + "counterfactuals.csv", index=False ) # get coordinates of the clicked point (saved during 'click' event) clicked_point = request.session.get("clicked_point") clicked_point_df = pd.DataFrame( { "0": clicked_point[0], "1": clicked_point[1], f"{class_label}": row[class_label].astype(str), } ) # tSNE cf_df = pd.read_csv(model_name_path + "counterfactuals.csv") model_name_dir_path = os.path.join(PIPELINE_PATH + f"{df_name}") tsne_path_to_augment = model_name_path + "tsne.sav" tsne = methods.generateAugmentedTSNE( df, cf_df, num_counterfactuals, clicked_point_df, tsne_path_to_augment, class_label, ) tsne.update_layout( # Modern Legend Design legend=dict( x=0.85, y=0.95, xanchor="right", yanchor="top", bgcolor="rgba(0,0,0,0.05)", # Transparent black background for a sleek look bordercolor="rgba(0,0,0,0.1)", # Soft border for separation borderwidth=1, font=dict( size=12, color="#333" ), # Modern grey font color for text ), # Tight Margins for a Focused Plot Area margin=dict( l=20, r=20, t=40, b=40 ), # Reduced margins for a cleaner look # Axis Titles and Labels: Minimalist Design xaxis=dict( title_font=dict( size=14, color="#555" ), # Medium grey color for axis title tickfont=dict( size=11, color="#777" ), # Light grey color for tick labels showline=True, linecolor="rgba(0,0,0,0.15)", # Subtle line color for axis lines zeroline=False, # Hide the zero line for a cleaner design showgrid=False, # No grid lines for a modern look ), yaxis=dict( title_font=dict(size=14, color="#555"), tickfont=dict(size=11, color="#777"), showline=True, linecolor="rgba(0,0,0,0.15)", zeroline=False, showgrid=False, ), # Sleek Background Design plot_bgcolor="white", # Crisp white background for a modern touch paper_bgcolor="white", # Ensure the entire background is uniform # Title: Modern Font and Centered title=dict( text="t-SNE Visualization of Data", font=dict( size=18, color="#333", family="Arial, sans-serif" ), # Modern font style x=0.5, xanchor="center", yanchor="top", pad=dict(t=10), # Padding to give the title breathing space ), ) pickle.dump(tsne, open(model_name_path + "tsne_cfs.sav", "wb")) context = { "dataset_type": dataset_type, "model_name": model_name, "tsne": tsne.to_html(), "num_counterfactuals": num_counterfactuals, "default_counterfactual": "1", "clicked_point": notpre_row.to_html(), "counterfactual": cf_df.iloc[[1]].to_html(), } else: context = { "dataset_type": dataset_type, "model_name": model_name, "message": "Please try again with different features.", } elif dataset_type == "timeseries": model_name = request.POST["model_name"] model_name_path = os.path.join( PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/" ) path = model_name_path if model_name == "glacier": constraint = request.POST["constraint"] path = os.path.join( PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/" + f"{constraint}/" ) X_test_path = os.path.join(model_name_path + "X_test.csv") y_test_path = os.path.join(model_name_path + "y_test.npy") y_pred_path = os.path.join(path + "y_pred.npy") X_cf_path = os.path.join(path + "X_cf.npy") cf_pred_path = os.path.join(path + "cf_pred.npy") X_test = pd.read_csv(X_test_path) y_test = np.load(y_test_path) y_pred = np.load(y_pred_path) X_cf = np.load(X_cf_path) cf_pred = np.load(cf_pred_path) if model_name != "glacier": scaler = joblib.load(model_name_path + "/min_max_scaler.sav") X_test = pd.DataFrame(scaler.inverse_transform(X_test)) X_cf = scaler.inverse_transform(X_cf) fig = methods.ecg_plot_counterfactuals( int(df_id), X_test, y_test, y_pred, X_cf, cf_pred ) context = { "df_name": df_name, "fig": fig.to_html(), "dataset_type": dataset_type, } elif action == "compute_cf": model_name = request.POST.get("model_name") if model_name == "glacier": constraint_type = request.POST.get("constraint") w_value = request.POST.get("w_value") df_name = request.session.get("df_name") model_name_path = os.path.join( PIPELINE_PATH + f"{df_name}/" + "trained_models/" + f"{model_name}/" ) model_name_path_constraint = model_name_path + f"{constraint_type}/" if not os.path.exists(model_name_path_constraint): os.makedirs(model_name_path_constraint) # https://github.com/wildboar-foundation/wildboar/blob/master/docs/guide/explain/counterfactuals.rst#id27 classifier = joblib.load(model_name_path + "/classifier.sav") # pipeline path json_path = os.path.join(PIPELINE_PATH, f"{df_name}" + "/pipeline.json") # load pipeline data jsonFile = pipeline.PipelineJSON(json_path) autoencoder = jsonFile.read_from_json( ["classifier", model_name, "autoencoder"] ) experiment_dict = {"constraint": constraint_type, "w_value": w_value} # if "experiments" in pipeline_data["classifier"][model_name]: # # if there exists key with value "experiments" # keys = pipeline_data["classifier"][model_name]["experiments"].keys() # last_key_int = int(list(keys)[-1]) # last_key_int_incr_str = str(last_key_int + 1) # else: # last_key_int_incr_str = "0" # experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}} # pipeline_data["classifier"][model_name].update(experiment_key_dict) # outter_dict = {last_key_int_incr_str: experiment_dict} # pipeline_data["classifier"][model_name]["experiments"].update(outter_dict) if jsonFile.key_exists("experiments"): keys = jsonFile.read_from_json( ["classifier", model_name, "experiments"] ).keys() last_key_int = int(list(keys)[-1]) last_key_int_incr_str = str(last_key_int + 1) else: last_key_int_incr_str = "0" experiment_key_dict = {"experiments": {last_key_int_incr_str: {}}} jsonFile.update_json( ["classifier", model_name], experiment_key_dict ) outter_dict = {last_key_int_incr_str: experiment_dict} jsonFile.update_json( ["classifier", model_name, "experiments"], outter_dict ) if autoencoder == "Yes": autoencoder = joblib.load(model_name_path + "/autoencoder.sav") else: autoencoder = None gc_compute_counterfactuals( model_name_path, model_name_path_constraint, constraint_type, [0.0001], float(w_value), 0.5, classifier, autoencoder, ) path = model_name_path_constraint context = {"experiment_dict": experiment_dict} elif action == "counterfactual_select": # if