#!/opt/proxmanager/env/bin/python3 import argparse import time from configparser import ConfigParser from pathlib import Path from proxmoxer import ProxmoxAPI from proxmoxer.tools import Tasks def running(vm): if vm['status'] == 'running': return True return False class Log: _verbosity = None _continue = False def __init__(self, verbosity): self._verbosity = verbosity def log(self, level, message, lf=True): if self._verbosity < level: return kwargs = {'flush': True} if lf == False: self._continue = True kwargs['end'] = '' else: self._continue = False pad = '' if not self._continue: pad = ' ' * level print(f'{pad}{message}', **kwargs) def info(self, message, **kwargs): self.log(0, message, **kwargs) def moreinfo(self, message, **kwargs): self.log(1, message, **kwargs) def debug(self, message, **kwargs): self.log(2, message, **kwargs) class Ceph: manager = None log = None noout_path = None def __init__(self, proxmanager, log): self.manager = proxmanager self.log = log self.noout_path = 'cluster/ceph/flags/noout' def get_noout(self): return self.manager.get(self.noout_path) def is_healthy(self): status = self.manager.get('cluster/ceph/status') if status['health']['status'] == 'HEALTH_OK': return True warnings = {check: value for check, value in status['health']['checks'].items() if check != 'OSDMAP_FLAGS'} if warnings: self.log.debug('Ceph reports the following warnings:') for check, value in warnings.items(): message = value['summary']['message'] self.log.debug(f'{check}: {message}') return False flags = [flag['name'] for flag in self.manager.get('cluster/ceph/flags') if flag['name'] != 'noout' and flag['value'] == 1] if flags: self.log.debug('The following Ceph flags are set:') for flag in flags: self.log.debug(flag) return False return True def set_noout(self): if self.get_noout(): return if not self.is_healthy(): raise Exception("Ceph reports unhealthy state, " "won't set noout under these conditions.") self.manager.put(self.noout_path, value=1) return def unset_noout(self): if not self.get_noout(): return if not self.is_healthy(): waited = 0 self.log.moreinfo('Ceph reports unhealthy state, ' 'waiting to see if it resolves itself.') while True: time.sleep(self.manager.poll_interval) waited += self.manager.poll_interval if self.is_healthy(): self.log.moreinfo('Ceph health has recovered ' f'after {waited}s.') break if waited >= self.manager.ceph_resolve_wait: raise Exception('Ceph health still degraded ' f'after {waited}s. Giving up.') self.manager.put(self.noout_path, value=0) return class ProxManager: connected_node = None all_nodes = None def __init__(self, cluster, config, log): if f'auth.{cluster}' not in config: raise Exception('No known authentication methods ' f'for cluster {cluster}') self.conf = config self.log = log self.cluster = cluster self.max_workers = self.conf.get('connection', 'max workers', fallback=5) self.poll_interval = self.conf.get('connection', 'poll interval', fallback=10) self.reboot_wait = self.conf.get('connection', 'node reboot timeout', fallback=400) self.ceph_resolve_wait = self.conf.get('connection', 'ceph resolve timeout', fallback=200) self.init_connection(cluster) self.ceph = Ceph(self, log) def init_connection(self, target): self.log.info(f'Establishing API connection to {target}.') self.papi = ProxmoxAPI(target, user=self.conf.get(f'auth.{self.cluster}', 'user name'), token_name=self.conf.get(f'auth.{self.cluster}', 'token name'), token_value=self.conf.get(f'auth.{self.cluster}', 'token value'), timeout=self.conf.getint('connection', 'http_timeout', fallback=300), verify_ssl=False) nodes = [item for item in self.get('cluster/status') if item['type'] == 'node'] for node in nodes: if node['local'] == 1: self.connected_node = node['name'] self.log.moreinfo(f'Connected to {self.connected_node}.') break else: raise Exception("Can't find connected node! " "I can't work like this!") if not self.all_nodes: self.all_nodes = [item['name'] for item in nodes] return def make_request(self, verb, path, **kwargs): path = self.papi(path) result = getattr(path, verb)(**kwargs) if isinstance(result, str) and result.startswith('UPID:'): return Tasks.blocking_status(self.papi, result) return result def get(self, path, **kwargs): return self.make_request('get', path, **kwargs) def post(self, path, **kwargs): return self.make_request('post', path, **kwargs) def put(self, path, **kwargs): return self.make_request('put', path, **kwargs) def do_migration(self, origin, target, vmids): if not vmids: return self.log.info(f'Doing migrations from {origin} to {target}...') vmids_str = ','.join(map(str, vmids)) self.log.moreinfo(f'Migrating vmids: {vmids_str}') self.post(f'nodes/{origin}/migrateall', target=target, maxworkers=self.max_workers, vms=vmids_str) self.log.info('done!') def get_running_vms(self, node): return [vm for vm in self.get(f'nodes/{node}/qemu') if running(vm)] def collect_cluster_status(self): self.log.debug('Collecting cluster status information.') available_resources = {} for node in self.all_nodes: status = self.get(f'nodes/{node}/status') cpu_count = status['cpuinfo']['cpus'] cpus_free = cpu_count memory_total = status['memory']['total'] # Subtracting ~150MB as a safety buffer of reserved memory # Based on observed difference between reported free memory # across pvesh and free memory_free = memory_total - 150000000 vms = self.get_running_vms(node) for vm in vms: cpus_free -= vm['cpus'] memory_free -= vm['maxmem'] self.log.debug(f'{node}: {cpus_free} unused cores, ' f'{memory_free} free memory') available_resources[node] = {'memory': memory_free, 'cpu': cpus_free} return available_resources def find_best_node(self, nodeinfo, vm): # Checks for sufficient free memory and cores, and chooses the node # with most available memory from the ones eligible required_cores = vm['cpus'] required_memory = vm['maxmem'] name = vm['name'] vmid = vm['vmid'] self.log.debug('Looking for best migration target ' f'for guest {name}({vmid}).') best_fit = None for node, stats in nodeinfo.items(): cpu = stats['cpu'] memory = stats['memory'] if cpu < required_cores: self.log.debug(f'{node} unsuitable: {cpu} free cores ' f'but {required_cores} needed') continue if memory < required_memory: self.log.debug(f'{node} unsuitable: {memory} free memory ' f'but {required_memory} needed') continue if not best_fit: best_fit = node self.log.debug(f'{node} selected by default') continue if memory > nodeinfo[best_fit]['memory']: self.log.debug(f'{node} is now best fit') best_fit = node if not best_fit: raise Exception('No node with sufficient capacity available!') self.log.debug(f'{best_fit} chosen for migration.') return best_fit def migrate_vms(self, origin, target): self.log.info(f'Initiating migrations from {origin} to {target}.') migrations = [] vms = self.get_running_vms(origin) for vm in vms: name = vm['name'] vmid = vm['vmid'] self.log.debug(f'Queueing guest {name}({vmid}) ' f'for migration to {target}.') migrations.append(vmid) self.do_migration(origin, target, migrations) def distribute_vms(self, origin): self.log.info(f'Distributing guests away from {origin}.') target_nodes = [item for item in self.all_nodes if item != origin] available_resources = self.collect_cluster_status() del available_resources[origin] migrations = {item:[] for item in target_nodes} vms = self.get_running_vms(origin) for vm in vms: name = vm['name'] vmid = vm['vmid'] cores = vm['cpus'] memory = vm['maxmem'] target = self.find_best_node(available_resources, vm) self.log.debug(f'Queueing guest {name}({vmid}) ' f'for migration to {target}.') migrations[target].append(vmid) available_resources[target]['memory'] -= memory available_resources[target]['cpu'] -= cores for target, vmids in migrations.items(): self.do_migration(origin, target, vmids) return def rebalance_vms(self, target): self.log.info(f'Pulling guests into {target} to balance load.') available_resources = self.collect_cluster_status() all_memory = [item['memory'] for item in available_resources.values()] all_cores = [item['cpu'] for item in available_resources.values()] avg_free_memory = sum(all_memory)/len(all_memory) avg_free_memory_up = avg_free_memory + avg_free_memory*0.1 avg_free_memory_down = avg_free_memory - avg_free_memory*0.1 avg_free_cores = sum(all_cores)/len(all_cores) self.log.debug(f'Average free memory across cluster: {avg_free_memory}') self.log.debug(f'Average free cores across cluster: {avg_free_cores}') target_free_memory = available_resources[target]['memory'] target_free_cores = available_resources[target]['cpu'] # Collect a list of all running vms not residing on the target node # and take note of each vm's host node all_vms = [] for node in self.all_nodes: if node == target: continue vms = self.get_running_vms(node) for vm in vms: vm['node'] = node all_vms.append(vm) # Iterate over the vms in order of highest memory allocation # first. Break the loop once target gets within 10% of the # cluster's average free memory or fewer free cores than the # cluster average. all_vms = sorted(all_vms, key=lambda vm: vm['maxmem'], reverse=True) smallest_vm = all_vms[-1] migrations = {item: [] for item in self.all_nodes if item != target} for vm in all_vms: vmid = vm['vmid'] node = vm['node'] name = vm['name'] memory = vm['maxmem'] if target_free_memory - memory < avg_free_memory_down: # Only migrate vms that will cause memory usage up to # 10% above cluster average continue self.log.debug(f'Queueing guest {name}({vmid}) ' f'for migration from {node}.') migrations[node].append(vmid) target_free_memory -= vm['maxmem'] target_free_cores -= vm['cpus'] if target_free_memory < avg_free_memory_up: # Break if the migrated vm will cause target memory # usage to come within 10% of cluster average break if target_free_cores < avg_free_cores: break # Do the migrations for origin, vmids in migrations.items(): self.do_migration(origin, target, vmids) return def migrate_vms_away(self, node=None, target=None, strategy=None): self.log.info(f'Initiating migration of guests from {node}.') if target and strategy: raise Exception('target and strategy are mutually exclusive') if target: self.migrate_vms(node, target) return if strategy == 'distribute': self.distribute_vms(node) return raise Exception('migration command not understood') def fix_node_services(self, target): self.log.info(f'Checking for failed services on {target}.') services = self.get(f'nodes/{target}/services') failed = [service for service in services if service['state'] == 'failed'] if not failed: self.log.info('No failed services found.') return for service in failed: name = service['name'] self.log.info(f'Found failed service {name}, restarting.') self.post(f'nodes/{target}/services/{name}/start') def fix_cluster_services(self): for node in self.all_nodes: self.fix_node_services(node) def reboot_node(self, target): self.log.info(f'Initiating reboot of node {target}.') if target == self.connected_node: # Switch connection to another node so we don't lose it self.log.debug(f'API connected to {target}, switching nodes.') for index, node in enumerate(self.all_nodes): if node == target: potential_index = index - 1 if potential_index < 0: potential_index = len(self.all_nodes) - 1 self.init_connection(self.all_nodes[potential_index]) self.log.moreinfo('Setting noout flag on Ceph') self.ceph.set_noout() self.log.moreinfo('Issuing reboot command.') self.post(f'nodes/{target}/status', command='reboot') # Give the node some time to actually go down time.sleep(10) # Wait for node to come back online before moving on self.log.info(f'Waiting for {target} to come back online.') elapsed = 0 while True: time.sleep(self.poll_interval) elapsed += self.poll_interval nodes_status = {item['node']: item['status'] for item in self.get('nodes')} if nodes_status[target] == 'online': self.log.info(f'{target} is back online after {elapsed}s.') break if elapsed >= self.reboot_wait: raise Exception(f"Node {target} hasn't come online " f"after {elapsed}s, giving up.") self.log.info('Waiting for Ceph to normalise.') self.ceph.unset_noout() self.log.info('noout flag unset on Ceph') self.log.info(f'Reboot of {target} done.') def reboot_cluster(self): self.log.info('Initiating cluster reboot.') for index, node in enumerate(self.all_nodes): if index == 0: self.migrate_vms_away(node, strategy='distribute') else: self.migrate_vms_away(node, target=self.all_nodes[index-1]) self.reboot_node(node) # the value of node will be the last one from the loop self.rebalance_vms(node) return if __name__ == '__main__': parser = argparse.ArgumentParser( prog='proxmanager', description='Perform administrative actions on a proxmox cluster') parser.add_argument('cluster') verbosity = parser.add_mutually_exclusive_group() verbosity.add_argument('--verbose', '-v', action='count', default=0, help="Be verbose. Specify twice for maximum verbosity") verbosity.add_argument('--quiet', '-q', action='store_const', const=-1, dest='verbose', help="Suppress all output except exceptions") subcommands = parser.add_subparsers(dest='command', help='Available actions:') reboot = subcommands.add_parser('reboot', help='Reboot the target cluster') reboot_node = subcommands.add_parser('reboot-node', help='Reboot a specified node in target cluster') reboot_node.add_argument('target_node', help='The node to be rebooted') fix_services = subcommands.add_parser('fix-services', help='Attempt to wake up failed services') fix_services.add_argument('--node', '-n', help='Target a specific node') args = parser.parse_args() configfile = Path(__file__).parent / 'config.ini' config = ConfigParser() config.read(configfile) log = Log(args.verbose) p = ProxManager(args.cluster, config, log) match args.command: case 'reboot': p.reboot_cluster() case 'reboot-node': target = args.target_node p.migrate_vms_away(target, strategy='distribute') p.reboot_node(target) p.rebalance_vms(target) case 'fix-services': if args.node: p.fix_node_services(args.node) else: p.fix_cluster_services() case _: raise Exception('Unknown action')