EXTREMUM_web/base/pipeline.py

176 lines
5.6 KiB
Python

import os
import json
class PipelineJSON:
def __init__(self, path):
"""
Initialize the PipelineJSON object and create an empty JSON file if it doesn't exist.
:param path: Path to the JSON file
"""
self.json_path = path
if not os.path.exists(self.json_path):
with open(self.json_path, "w") as f:
json.dump({}, f) # Initialize with an empty dictionary
self.data = {}
else:
self.data = self._load_json()
def _load_json(self):
"""
Load JSON data from the file.
:return: Dictionary containing JSON data
"""
try:
with open(self.json_path, "r") as f:
return json.load(f) if os.path.getsize(self.json_path) > 0 else {}
except json.JSONDecodeError:
print("Error decoding JSON. Returning an empty dictionary.")
return {}
except Exception as e:
print(f"Error loading JSON: {e}")
return {}
def _save_json(self, data):
"""
Save JSON data to the file.
:param data: Dictionary to save to the JSON file
"""
try:
with open(self.json_path, "w") as f:
json.dump(data, f, indent=4)
except Exception as e:
print(f"Error saving JSON: {e}")
def append_to_json(self, info):
"""
Append a dictionary to the JSON file.
:param info: Dictionary to append to the JSON data
:return: 1 if successful, -1 if an error occurs
"""
try:
data = self._load_json()
data.update(info)
self._save_json(data)
self.data = data
return 1
except Exception as e:
print(f"Error appending to JSON: {e}")
return -1
def read_from_json(self, keys):
"""
Fetch a specific value from the JSON file using a list of keys.
:param keys: List of keys to traverse the JSON object
:return: Value at the specified keys, or None if not found
"""
try:
tmp = self.data
for key in keys:
tmp = tmp[key]
return tmp
except KeyError:
print(f"KeyError: One or more keys {keys} do not exist.")
return None
except Exception as e:
print(f"Error reading from JSON: {e}")
return None
def update_json(self, keys, info):
"""
Update the JSON data at a specific key with new information.
:param keys: List of keys to locate the target value
:param info: Dictionary to update the target value
"""
try:
tmp = self.data
for key in keys[:-1]:
tmp = tmp[key]
tmp[keys[-1]].update(info)
self._save_json(self.data)
except KeyError:
print(f"KeyError: One or more keys {keys} do not exist.")
except Exception as e:
print(f"Error updating JSON: {e}")
def key_exists(self, key):
if not isinstance(self.data, dict):
raise ValueError("self.data must be a dictionary.")
stack = [self.data]
while stack:
current_dict = stack.pop()
if key in current_dict:
return True
for value in current_dict.values():
if isinstance(value, dict):
stack.append(value)
return False
def get_keys_with_value(self, value):
"""
Get all keys that contain a specific value.
:param value: Value to search for
:return: List of keys containing the specified value
"""
return [key for key, val in self.data.items() if value in val]
def delete_key(self, key_path):
"""
Delete a key from the JSON file. If the parent key becomes empty, delete it as well.
:param key_path: List of keys representing the path to the key to delete (e.g., ["classifier", "glacier"]).
"""
try:
# Navigate to the parent dictionary
current = self.data
for key in key_path[:-1]:
current = current.get(key, {})
# Delete the target key
key_to_delete = key_path[-1]
if key_to_delete in current:
del current[key_to_delete]
print(f"Key '{key_to_delete}' has been deleted.")
else:
print(f"Key '{key_to_delete}' does not exist.")
# Remove empty parent keys recursively
parent = self.data
for key in key_path[:-1]:
if not parent[key]: # If the parent key becomes empty
del parent[key]
print(f"Parent key '{key}' has been deleted because it is empty.")
break
parent = parent[key]
# Save updated JSON data
self._save_json(self.data)
except Exception as e:
print(f"Error deleting key: {e}")
# if "classifier" in pipeline_json.keys():
# temp_jason = {model_name: preprocessing_info}
# pipeline_json["classifier"].update(temp_jason)
# else:
# temp_jason = {
# "preprocessed_name": df_name + "_preprocessed.csv",
# "classifier": {model_name: preprocessing_info},
# }
# pipeline_json.update(temp_jason)
def get_all_keys(d):
keys = []
for key, value in d.items():
keys.append(key)
if isinstance(value, dict): # If value is a dictionary, call the function recursively
keys.extend(get_all_keys(value))
return keys