122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
import os
|
|
from .methods import PIPELINE_PATH
|
|
import json
|
|
|
|
|
|
class pipeline_json:
|
|
def __init__(self, path):
|
|
self.json_path = path
|
|
if not os.path.exists(self.json_path):
|
|
open(self.json_path, "w")
|
|
self.data = {} # data becomes a dictionary
|
|
else:
|
|
jsonFile = open(self.json_path, "r")
|
|
self.data = json.load(jsonFile) # data becomes a dictionary
|
|
print("Pipeline.json already exists.")
|
|
|
|
|
|
def append_pipeline_json(self, info):
|
|
"""
|
|
Appends to json object file located in path
|
|
|
|
:param info: Dictionary to append to json file
|
|
:param path: Path where json file is located
|
|
:return: File's content
|
|
"""
|
|
try:
|
|
with open(self.json_path, "r") as f:
|
|
if os.path.getsize(self.json_path) > 0:
|
|
data = json.load(f) # Load the JSON data from the file
|
|
else:
|
|
data = {}
|
|
|
|
data.update(info)
|
|
|
|
with open(self.json_path, 'w') as f:
|
|
json.dump(data, f, indent=4) # Write the updated JSON data back to the file
|
|
|
|
self.data = data
|
|
|
|
return 1
|
|
except Exception as e:
|
|
print("herererr: ", e)
|
|
return -1
|
|
|
|
def read_pipline_json(self, key_list):
|
|
"""
|
|
Fetch a specific key from a json file located in path
|
|
|
|
:param key_list: Key to look up
|
|
:return: result
|
|
"""
|
|
jsonFile = open(self.json_path, "r")
|
|
pipeline_json_data = json.load(jsonFile) # data becomes a dictionary
|
|
tmp1 = pipeline_json_data
|
|
tmp = {}
|
|
|
|
if key_list != "":
|
|
for key in key_list:
|
|
try:
|
|
tmp = tmp1[key]
|
|
tmp1 = tmp
|
|
except Exception as e:
|
|
print(e)
|
|
return None
|
|
return tmp1
|
|
else:
|
|
return None
|
|
|
|
def update_pipeline_json(self, key_list, info):
|
|
|
|
jsonFile = open(self.json_path, "r")
|
|
pipeline_json_data = json.load(jsonFile) # data becomes a dictionary
|
|
tmp1 = pipeline_json_data
|
|
|
|
if key_list != "":
|
|
for key in key_list:
|
|
try:
|
|
tmp = tmp1[key]
|
|
tmp1 = tmp
|
|
except Exception as e:
|
|
print(e)
|
|
return None
|
|
|
|
tmp1.update(info)
|
|
with open(self.json_path, 'w') as file:
|
|
json.dump(pipeline_json_data, file, indent=4)
|
|
|
|
self.data = data
|
|
else:
|
|
return None
|
|
|
|
def check_key_pipeline_json(self, key):
|
|
keys = get_all_keys(self.data)
|
|
if key in keys:
|
|
return 1
|
|
return 0
|
|
|
|
def get_keys_with_specific_value(self, value):
|
|
list_of_keys = []
|
|
for key,value in self.data.items():
|
|
if "uploaded" in value:
|
|
list_of_keys.append(key)
|
|
print(list_of_keys)
|
|
return list_of_keys
|
|
|
|
# if "classifier" in pipeline_json.keys():
|
|
# temp_jason = {model_name: preprocessing_info}
|
|
# pipeline_json["classifier"].update(temp_jason)
|
|
# else:
|
|
# temp_jason = {
|
|
# "preprocessed_name": df_name + "_preprocessed.csv",
|
|
# "classifier": {model_name: preprocessing_info},
|
|
# }
|
|
# pipeline_json.update(temp_jason)
|
|
|
|
def get_all_keys(d):
|
|
keys = []
|
|
for key, value in d.items():
|
|
keys.append(key)
|
|
if isinstance(value, dict): # If value is a dictionary, call the function recursively
|
|
keys.extend(get_all_keys(value))
|
|
return keys |