import base.pipeline as pipeline import os from dict_and_html import * from .. import methods from ..methods import PIPELINE_PATH from django.core.files.storage import FileSystemStorage import random import base.pipeline as pipeline import shutil import json from django.shortcuts import HttpResponse def handler(action, request): status = 200 if action == "upload_dataset": uploaded_file = request.FILES["excel_file"] # Get the file from request.FILES dataset_type = request.POST.get("dataset_type") # action to add dataset when from radio button click # add name of used dataframe in session for future use request.session["df_name"] = "upload" name = uploaded_file.name # Split the name and extension base_name, extension = os.path.splitext(name) request.session["df_name_upload_base_name"] = base_name request.session["df_name_upload_extension"] = extension df_name = base_name df_name_path = os.path.join( PIPELINE_PATH + f"{base_name}", ) if not os.path.exists(df_name_path): os.makedirs(df_name_path) fs = FileSystemStorage() # FileSystemStorage to save the file # Save the file with the new filename fs = FileSystemStorage(location=df_name_path) filename = fs.save(uploaded_file.name, uploaded_file) # Save file request.session["excel_file_name"] = df_name_path excel_file_name_path = os.path.join(PIPELINE_PATH + f"{base_name}" + "/" + name) df = methods.get_dataframe(excel_file_name_path) ## update the datasets_types json file datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "dataset_types_pipeline.json" ) jsonFile = pipeline.PipelineJSON(datasets_types_PipelineJSON_path) # with open(datasets_types_PipelineJSON_path, "r") as jsonFile: # datasets_types_PipelineJSON = pipeline.load( # jsonFile # ) # data becomes a dictionary jsonFile.append_to_json({df_name: [dataset_type, "uploaded"]}) dataset_type = jsonFile.read_from_json([df_name])[0] uploaded_files = jsonFile.get_keys_with_value("uploaded") # datasets_types_PipelineJSON[df_name] = dataset_type # with open(datasets_types_PipelineJSON_path, "w") as file: # pipeline.dump( # datasets_types_PipelineJSON, file, indent=4 # ) # Write with pretty print (indent=4) if df.columns.str.contains(" ").any(): df.columns = df.columns.str.replace(" ", "_") # if columns contain space os.remove(excel_file_name_path) df.to_csv(excel_file_name_path, index=None) df = methods.get_dataframe(excel_file_name_path) if "id" in df.columns: df.drop(["id"], axis=1, inplace=True) df.to_csv(excel_file_name_path, index=False) # if dataset_type == "tabular": # # tabular datasets # features = df.columns # feature1 = df.columns[3] # feature2 = df.columns[2] # labels = list(df.select_dtypes(include=["object", "category"]).columns) # # Find binary columns (columns with only two unique values, including numerics) # binary_columns = [col for col in df.columns if df[col].nunique() == 2] # # Combine categorical and binary columns into one list # labels = list(set(labels + binary_columns)) # label = random.choice(labels) # fig = methods.stats( # excel_file_name_path, # dataset_type, # None, # None, # feature1, # feature2, # label, # df_name, # ) # # tabular dataset # request.session["data_to_display"] = df[:10].to_html() # request.session["features"] = list(features) # request.session["feature1"] = feature1 # request.session["feature2"] = feature2 # request.session["labels"] = list(labels) # request.session["curlabel"] = label # request.session["fig"] = fig # context = { # "dataset_type": dataset_type, # "data_to_display": df[:10].to_html(), # "fig": fig, # "features": list(features), # error if not a list # "feature1": feature1, # "feature2": feature2, # "labels": list(labels), # "curlabel": label, # "df_name": request.session["df_name"], # } # elif dataset_type == "timeseries": # fig, fig1 = methods.stats(excel_file_name_path, dataset_type) # request.session["fig"] = fig # request.session["fig1"] = fig1 # context = { # "dataset_type": dataset_type, # "df_name": df_name, # "fig": fig, # "fig1": fig1, # } context = {"dataset_type": dataset_type, "df_name": df_name} context.update({"uploaded_files": uploaded_files}) if dataset_type == "timeseries": target_labels = list(df.iloc[:, -1].unique()) context.update({"target_labels": target_labels}) request.session["context"] = context elif action == "delete_uploaded_file": dataset_name = request.POST.get("dataset_name") dataset_path = os.path.join(PIPELINE_PATH + f"/{dataset_name}") # pipeline path datasets_types_pipeline_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) # load pipeline data datasets_types_pipeline = pipeline.PipelineJSON(datasets_types_pipeline_path) datasets_types_pipeline.delete_key([dataset_name]) request.FILES["excel_file"] = None request.session["df_name"] = None # check if there exist uploaded files uploaded_files = datasets_types_pipeline.get_keys_with_value( "uploaded" ) if uploaded_files == []: uploaded_files = None try: shutil.rmtree(dataset_path) except Exception as error: print(error) context = {"uploaded_files": uploaded_files} elif action == "dataset" or action == "uploaded_datasets": # action to add dataset when from radio button click name = request.POST.get("df_name") request.session["df_name"] = name # if name == "upload": # name = request.session.get("df_name_upload_base_name") if action == "dataset" and name == "upload": request.session["upload"] = 1 context = {"upload": 1} else: if name == "timeseries": name = request.session.get("df_name") excel_file_name_path = os.path.join( PIPELINE_PATH + f"{name}" + "/" + name + ".csv", ) datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) datasets_types_PipelineJSON = pipeline.PipelineJSON( datasets_types_PipelineJSON_path ) dataset_type = datasets_types_PipelineJSON.read_from_json([name]) uploaded_files = datasets_types_PipelineJSON.get_keys_with_value( "uploaded" ) if request.POST.get("df_name") == "upload" or action == "uploaded_datasets": if type(dataset_type) is list: dataset_type = dataset_type[0] if request.POST.get("df_name") != "upload" or action == "uploaded_datasets": if os.path.exists(excel_file_name_path): df = methods.get_dataframe(excel_file_name_path) df.columns = df.columns.str.replace(" ", "_") request.session["excel_file_name"] = excel_file_name_path json_path = os.path.join( PIPELINE_PATH + f"{name}" + "/pipeline.json" ) if not os.path.exists(json_path): PipelineJSON = pipeline.PipelineJSON(json_path) PipelineJSON.append_to_json({"name": name}) if "tabular" == dataset_type: if "id" in df.columns: df.drop(["id"], axis=1, inplace=True) df.to_csv(excel_file_name_path, index=False) # tabular datasets features = df.columns feature1 = df.columns[3] feature2 = df.columns[2] label = "" labels = list( df.select_dtypes(include=["object", "category"]).columns ) # Find binary columns (columns with only two unique values, including numerics) binary_columns = [ col for col in df.columns if df[col].nunique() == 2 ] # Combine categorical and binary columns into one list labels = list(set(labels + binary_columns)) label = random.choice(labels) fig = methods.stats( excel_file_name_path, dataset_type, feature1=feature1, feature2=feature2, label=label, ) # tabular dataset request.session["data_to_display"] = df[:10].to_html() request.session["features"] = list(features) request.session["feature1"] = feature1 request.session["feature2"] = feature2 request.session["labels"] = list(labels) request.session["curlabel"] = label request.session["fig"] = fig context = { "dataset_type": dataset_type, "data_to_display": df[:10].to_html(), "fig": fig, "features": list(features), # error if not a list "feature1": feature1, "feature2": feature2, "labels": list(labels), "curlabel": label, "uploaded_files": list(uploaded_files), } elif dataset_type == "timeseries": json_path = os.path.join( PIPELINE_PATH, f"{name}" + "/pipeline.json" ) jsonFile = pipeline.PipelineJSON(json_path) pos = jsonFile.read_from_json(["pos"]) neg = jsonFile.read_from_json(["neg"]) fig, fig1 = methods.stats( excel_file_name_path, dataset_type, int(pos), int(neg), None, None, None, name=name, ) # timeseries request.session["fig"] = fig request.session["fig1"] = fig1 context = { "fig": fig, "fig1": fig1, "dataset_type": dataset_type, } else: context = {"uploaded_files": list(uploaded_files)} else: context = {} if ( action == "uploaded_datasets" and "upload" in request.session and request.session["upload"] == 1 ): request.session["upload"] = 1 context.update({"upload": 1, "df_name": name}) print(name) else: request.session["upload"] = 0 elif action == "dataset_charts": df_name = request.POST.get("df_name") request.session["df_name"] = df_name context = {} elif action == "select_class_labels_for_uploaded_timeseries": name = request.session["df_name"] if name == "upload": name = request.session["df_name_upload_base_name"] pos = request.POST.get("positive_label") neg = request.POST.get("negative_label") json_path = os.path.join(PIPELINE_PATH, f"{name}" + "/pipeline.json") jsonFile = pipeline.PipelineJSON(json_path) jsonFile.append_to_json({"name": name}) jsonFile.append_to_json({"pos": pos}) jsonFile.append_to_json({"neg": neg}) context = {} elif action == "timeseries-dataset": # action to add dataset when from radio button click name = request.POST.get("timeseries_dataset") # add name of used dataframe in session for future use request.session["df_name"] = name excel_file_name_path = os.path.join( PIPELINE_PATH + f"{name}" + "/" + name + ".csv", ) datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) datasets_types_PipelineJSON = pipeline.PipelineJSON( datasets_types_PipelineJSON_path ) if os.path.exists(excel_file_name_path): dataset_type = datasets_types_PipelineJSON.read_from_json([name]) df = methods.get_dataframe(excel_file_name_path) df.columns = df.columns.str.replace(" ", "_") request.session["excel_file_name"] = excel_file_name_path # find the available pre trained datasets # check the pipeline file json_path = os.path.join(PIPELINE_PATH, f"{name}" + "/pipeline.json") jsonFile = pipeline.PipelineJSON(json_path) preprocessing_info = {"name": name} dataset_camel = methods.convert_to_camel_case(name) if "Ecg" in dataset_camel: dataset_camel = dataset_camel.replace("Ecg", "ECG") experiment = methods.fetch_line_by_dataset( PIPELINE_PATH + "/glacier_experiments.txt", dataset_camel, ) if experiment is not None: stripped_arguments = methods.extract_arguments_from_line(experiment) indices_to_keys = { 1: "pos", 2: "neg", } # Create a dictionary by fetching items from the list at the specified indices inner_dict = { key: stripped_arguments[index] for index, key in indices_to_keys.items() } preprocessing_info.update(inner_dict) jsonFile.append_to_json(preprocessing_info) pos = inner_dict["pos"] neg = inner_dict["neg"] fig, fig1 = methods.stats( excel_file_name_path, dataset_type, int(pos), int(neg), name=name ) # timeseries request.session["fig"] = fig request.session["fig1"] = fig1 context = {"fig": fig, "fig1": fig1, "dataset_type": dataset_type} else: context = {} elif action == "stat": name = request.session.get("df_name") datasets_types_PipelineJSON_path = os.path.join( PIPELINE_PATH + "/dataset_types_pipeline.json" ) jsonFile = pipeline.PipelineJSON(datasets_types_PipelineJSON_path) dataset_type = jsonFile.read_from_json([name]) if type(dataset_type) is list: dataset_type = dataset_type[0] file_path = os.path.join( PIPELINE_PATH + f"{name}" + "/" + name + ".csv", ) if dataset_type == "tabular": feature1 = request.POST.get("feature1") feature2 = request.POST.get("feature2") label = request.POST.get("label") else: feature1 = request.POST.get("feature1") feature2 = [] label = [] fig = methods.stats( file_path, dataset_type, None, None, feature1=feature1, feature2=feature2, label=label, ) context = { "fig": fig, } return HttpResponse(json.dumps(context), status=status)