import os import json class PipelineJSON: def __init__(self, path): """ Initialize the PipelineJSON object and create an empty JSON file if it doesn't exist. :param path: Path to the JSON file """ self.json_path = path if not os.path.exists(self.json_path): with open(self.json_path, "w") as f: json.dump({}, f) # Initialize with an empty dictionary self.data = {} else: self.data = self._load_json() def _load_json(self): """ Load JSON data from the file. :return: Dictionary containing JSON data """ try: with open(self.json_path, "r") as f: return json.load(f) if os.path.getsize(self.json_path) > 0 else {} except json.JSONDecodeError: print("Error decoding JSON. Returning an empty dictionary.") return {} except Exception as e: print(f"Error loading JSON: {e}") return {} def _save_json(self, data): """ Save JSON data to the file. :param data: Dictionary to save to the JSON file """ try: with open(self.json_path, "w") as f: json.dump(data, f, indent=4) except Exception as e: print(f"Error saving JSON: {e}") def append_to_json(self, info): """ Append a dictionary to the JSON file. :param info: Dictionary to append to the JSON data :return: 1 if successful, -1 if an error occurs """ try: data = self._load_json() data.update(info) self._save_json(data) self.data = data return 1 except Exception as e: print(f"Error appending to JSON: {e}") return -1 def read_from_json(self, keys): """ Fetch a specific value from the JSON file using a list of keys. :param keys: List of keys to traverse the JSON object :return: Value at the specified keys, or None if not found """ try: tmp = self.data for key in keys: tmp = tmp[key] return tmp except KeyError: print(f"KeyError: One or more keys {keys} do not exist.") return None except Exception as e: print(f"Error reading from JSON: {e}") return None def update_json(self, keys, info): """ Update the JSON data at a specific key with new information. :param keys: List of keys to locate the target value :param info: Dictionary to update the target value """ try: tmp = self.data for key in keys[:-1]: tmp = tmp[key] tmp[keys[-1]].update(info) self._save_json(self.data) except KeyError: print(f"KeyError: One or more keys {keys} do not exist.") except Exception as e: print(f"Error updating JSON: {e}") def key_exists(self, key): if not isinstance(self.data, dict): raise ValueError("self.data must be a dictionary.") stack = [self.data] while stack: current_dict = stack.pop() if key in current_dict: return True for value in current_dict.values(): if isinstance(value, dict): stack.append(value) return False def get_keys_with_value(self, value): """ Get all keys that contain a specific value. :param value: Value to search for :return: List of keys containing the specified value """ return [key for key, val in self.data.items() if value in val] def delete_key(self, key_path): """ Delete a key from the JSON file. If the parent key becomes empty, delete it as well. :param key_path: List of keys representing the path to the key to delete (e.g., ["classifier", "glacier"]). """ try: # Navigate to the parent dictionary current = self.data for key in key_path[:-1]: current = current.get(key, {}) # Delete the target key key_to_delete = key_path[-1] if key_to_delete in current: del current[key_to_delete] print(f"Key '{key_to_delete}' has been deleted.") else: print(f"Key '{key_to_delete}' does not exist.") # Remove empty parent keys recursively parent = self.data for key in key_path[:-1]: if not parent[key]: # If the parent key becomes empty del parent[key] print(f"Parent key '{key}' has been deleted because it is empty.") break parent = parent[key] # Save updated JSON data self._save_json(self.data) except Exception as e: print(f"Error deleting key: {e}") def delete_and_reorder_keys(self, parent_path, delete_key): """ Delete a specific key from a nested JSON dictionary and reorder the remaining keys to maintain consecutive numbering. :param parent_path: List of keys to locate the parent dictionary (e.g., ["classifier", "glacier", "experiments"]). :param delete_key: The key to delete (e.g., "1"). """ try: # Navigate to the parent dictionary current = self.data for key in parent_path: if key in current: current = current[key] else: print(f"KeyError: '{key}' does not exist in the provided path.") return # Check if the delete_key exists in the target dictionary if delete_key not in current: print(f"Key '{delete_key}' does not exist.") return # Delete the specified key del current[delete_key] print(f"Key '{delete_key}' has been deleted.") # Reorder the keys to maintain consecutive numbering reordered_dict = {} sorted_keys = sorted(current.keys(), key=int) # Sort keys numerically for new_key, old_key in enumerate(sorted_keys): reordered_dict[str(new_key)] = current[old_key] # Update the parent dictionary with the reordered dictionary parent = self.data for key in parent_path[:-1]: parent = parent[key] parent[parent_path[-1]] = reordered_dict # Save the updated JSON self._save_json(self.data) print("Keys reordered successfully.") except KeyError as e: print(f"KeyError: {e}") except Exception as e: print(f"Error deleting and reordering keys: {e}") # if "classifier" in pipeline_json.keys(): # temp_jason = {model_name: preprocessing_info} # pipeline_json["classifier"].update(temp_jason) # else: # temp_jason = { # "preprocessed_name": df_name + "_preprocessed.csv", # "classifier": {model_name: preprocessing_info}, # } # pipeline_json.update(temp_jason) def get_all_keys(d): keys = [] for key, value in d.items(): keys.append(key) if isinstance(value, dict): # If value is a dictionary, call the function recursively keys.extend(get_all_keys(value)) return keys