proxmanager/proxmanager.py
Erik Thuning 8316968deb Made several values relating to timeouts and polling configurable
Also removed unnecessary declarations of instance variables on
ProxManager, only keeping the ones that aren't set directly in
__init__ around for greater visibility.
2026-02-18 15:24:37 +01:00

514 lines
19 KiB
Python
Executable File

#!/opt/proxmanager/env/bin/python3
import argparse
import time
from configparser import ConfigParser
from pathlib import Path
from proxmoxer import ProxmoxAPI
from proxmoxer.tools import Tasks
def running(vm):
if vm['status'] == 'running':
return True
return False
class Log:
_verbosity = None
_continue = False
def __init__(self, verbosity):
self._verbosity = verbosity
def log(self, level, message, lf=True):
if self._verbosity < level:
return
kwargs = {'flush': True}
if lf == False:
self._continue = True
kwargs['end'] = ''
else:
self._continue = False
pad = ''
if not self._continue:
pad = ' ' * level
print(f'{pad}{message}', **kwargs)
def info(self, message, **kwargs):
self.log(0, message, **kwargs)
def moreinfo(self, message, **kwargs):
self.log(1, message, **kwargs)
def debug(self, message, **kwargs):
self.log(2, message, **kwargs)
class Ceph:
manager = None
log = None
noout_path = None
def __init__(self, proxmanager, log):
self.manager = proxmanager
self.log = log
self.noout_path = 'cluster/ceph/flags/noout'
def get_noout(self):
return self.manager.get(self.noout_path)
def is_healthy(self):
status = self.manager.get('cluster/ceph/status')
if status['health']['status'] == 'HEALTH_OK':
return True
warnings = {check: value for check, value
in status['health']['checks'].items()
if check != 'OSDMAP_FLAGS'}
if warnings:
self.log.debug('Ceph reports the following warnings:')
for check, value in warnings.items():
message = value['summary']['message']
self.log.debug(f'{check}: {message}')
return False
flags = [flag['name']
for flag in self.manager.get('cluster/ceph/flags')
if flag['name'] != 'noout'
and flag['value'] == 1]
if flags:
self.log.debug('The following Ceph flags are set:')
for flag in flags:
self.log.debug(flag)
return False
return True
def set_noout(self):
if self.get_noout():
return
if not self.is_healthy():
raise Exception("Ceph reports unhealthy state, "
"won't set noout under these conditions.")
self.manager.put(self.noout_path, value=1)
return
def unset_noout(self):
if not self.get_noout():
return
if not self.is_healthy():
waited = 0
self.log.moreinfo('Ceph reports unhealthy state, '
'waiting to see if it resolves itself.')
while True:
time.sleep(self.manager.poll_interval)
waited += self.manager.poll_interval
if self.is_healthy():
self.log.moreinfo('Ceph health has recovered '
f'after {waited}s.')
break
if waited >= self.manager.ceph_resolve_wait:
raise Exception('Ceph health still degraded '
f'after {waited}s. Giving up.')
self.manager.put(self.noout_path, value=0)
return
class ProxManager:
connected_node = None
all_nodes = None
def __init__(self, cluster, config, log):
if f'auth.{cluster}' not in config:
raise Exception('No known authentication methods '
f'for cluster {cluster}')
self.conf = config
self.log = log
self.cluster = cluster
self.max_workers = self.conf.get('connection',
'max workers',
fallback=5)
self.poll_interval = self.conf.get('connection',
'poll interval',
fallback=10)
self.reboot_wait = self.conf.get('connection',
'node reboot timeout',
fallback=400)
self.ceph_resolve_wait = self.conf.get('connection',
'ceph resolve timeout',
fallback=200)
self.init_connection(cluster)
self.ceph = Ceph(self, log)
def init_connection(self, target):
self.log.info(f'Establishing API connection to {target}.')
self.papi = ProxmoxAPI(target,
user=self.conf.get(f'auth.{self.cluster}',
'user name'),
token_name=self.conf.get(f'auth.{self.cluster}',
'token name'),
token_value=self.conf.get(f'auth.{self.cluster}',
'token value'),
timeout=self.conf.getint('connection',
'http_timeout',
fallback=300),
verify_ssl=False)
nodes = [item for item in self.get('cluster/status')
if item['type'] == 'node']
for node in nodes:
if node['local'] == 1:
self.connected_node = node['name']
self.log.moreinfo(f'Connected to {self.connected_node}.')
break
else:
raise Exception("Can't find connected node! "
"I can't work like this!")
if not self.all_nodes:
self.all_nodes = [item['name'] for item in nodes]
return
def make_request(self, verb, path, **kwargs):
path = self.papi(path)
result = getattr(path, verb)(**kwargs)
if isinstance(result, str) and result.startswith('UPID:'):
return Tasks.blocking_status(self.papi, result)
return result
def get(self, path, **kwargs):
return self.make_request('get', path, **kwargs)
def post(self, path, **kwargs):
return self.make_request('post', path, **kwargs)
def put(self, path, **kwargs):
return self.make_request('put', path, **kwargs)
def do_migration(self, origin, target, vmids):
if not vmids:
return
self.log.info(f'Doing migrations from {origin} to {target}...')
vmids_str = ','.join(map(str, vmids))
self.log.moreinfo(f'Migrating vmids: {vmids_str}')
self.post(f'nodes/{origin}/migrateall',
target=target,
maxworkers=self.max_workers,
vms=vmids_str)
self.log.info('done!')
def get_running_vms(self, node):
return [vm for vm in self.get(f'nodes/{node}/qemu')
if running(vm)]
def collect_cluster_status(self):
self.log.debug('Collecting cluster status information.')
available_resources = {}
for node in self.all_nodes:
status = self.get(f'nodes/{node}/status')
cpu_count = status['cpuinfo']['cpus']
cpus_free = cpu_count
memory_total = status['memory']['total']
# Subtracting ~150MB as a safety buffer of reserved memory
# Based on observed difference between reported free memory
# across pvesh and free
memory_free = memory_total - 150000000
vms = self.get_running_vms(node)
for vm in vms:
cpus_free -= vm['cpus']
memory_free -= vm['maxmem']
self.log.debug(f'{node}: {cpus_free} unused cores, '
f'{memory_free} free memory')
available_resources[node] = {'memory': memory_free,
'cpu': cpus_free}
return available_resources
def find_best_node(self, nodeinfo, vm):
# Checks for sufficient free memory and cores, and chooses the node
# with most available memory from the ones eligible
required_cores = vm['cpus']
required_memory = vm['maxmem']
name = vm['name']
vmid = vm['vmid']
self.log.debug('Looking for best migration target '
f'for guest {name}({vmid}).')
best_fit = None
for node, stats in nodeinfo.items():
cpu = stats['cpu']
memory = stats['memory']
if cpu < required_cores:
self.log.debug(f'{node} unsuitable: {cpu} free cores '
f'but {required_cores} needed')
continue
if memory < required_memory:
self.log.debug(f'{node} unsuitable: {memory} free memory '
f'but {required_memory} needed')
continue
if not best_fit:
best_fit = node
self.log.debug(f'{node} selected by default')
continue
if memory > nodeinfo[best_fit]['memory']:
self.log.debug(f'{node} is now best fit')
best_fit = node
if not best_fit:
raise Exception('No node with sufficient capacity available!')
self.log.debug(f'{best_fit} chosen for migration.')
return best_fit
def migrate_vms(self, origin, target):
self.log.info(f'Initiating migrations from {origin} to {target}.')
migrations = []
vms = self.get_running_vms(origin)
for vm in vms:
name = vm['name']
vmid = vm['vmid']
self.log.debug(f'Queueing guest {name}({vmid}) '
f'for migration to {target}.')
migrations.append(vmid)
self.do_migration(origin, target, migrations)
def distribute_vms(self, origin):
self.log.info(f'Distributing guests away from {origin}.')
target_nodes = [item for item in self.all_nodes if item != origin]
available_resources = self.collect_cluster_status()
del available_resources[origin]
migrations = {item:[] for item in target_nodes}
vms = self.get_running_vms(origin)
for vm in vms:
name = vm['name']
vmid = vm['vmid']
cores = vm['cpus']
memory = vm['maxmem']
target = self.find_best_node(available_resources, vm)
self.log.debug(f'Queueing guest {name}({vmid}) '
f'for migration to {target}.')
migrations[target].append(vmid)
available_resources[target]['memory'] -= memory
available_resources[target]['cpu'] -= cores
for target, vmids in migrations.items():
self.do_migration(origin, target, vmids)
return
def rebalance_vms(self, target):
self.log.info(f'Pulling guests into {target} to balance load.')
available_resources = self.collect_cluster_status()
all_memory = [item['memory'] for item in available_resources.values()]
all_cores = [item['cpu'] for item in available_resources.values()]
avg_free_memory = sum(all_memory)/len(all_memory)
avg_free_memory_up = avg_free_memory + avg_free_memory*0.1
avg_free_memory_down = avg_free_memory - avg_free_memory*0.1
avg_free_cores = sum(all_cores)/len(all_cores)
self.log.debug(f'Average free memory across cluster: {avg_free_memory}')
self.log.debug(f'Average free cores across cluster: {avg_free_cores}')
target_free_memory = available_resources[target]['memory']
target_free_cores = available_resources[target]['cpu']
# Collect a list of all running vms not residing on the target node
# and take note of each vm's host node
all_vms = []
for node in self.all_nodes:
if node == target:
continue
vms = self.get_running_vms(node)
for vm in vms:
vm['node'] = node
all_vms.append(vm)
# Iterate over the vms in order of highest memory allocation
# first. Break the loop once target gets within 10% of the
# cluster's average free memory or fewer free cores than the
# cluster average.
all_vms = sorted(all_vms, key=lambda vm: vm['maxmem'], reverse=True)
smallest_vm = all_vms[-1]
migrations = {item: [] for item in self.all_nodes
if item != target}
for vm in all_vms:
vmid = vm['vmid']
node = vm['node']
name = vm['name']
memory = vm['maxmem']
if target_free_memory - memory < avg_free_memory_down:
# Only migrate vms that will cause memory usage up to
# 10% above cluster average
continue
self.log.debug(f'Queueing guest {name}({vmid}) '
f'for migration from {node}.')
migrations[node].append(vmid)
target_free_memory -= vm['maxmem']
target_free_cores -= vm['cpus']
if target_free_memory < avg_free_memory_up:
# Break if the migrated vm will cause target memory
# usage to come within 10% of cluster average
break
if target_free_cores < avg_free_cores:
break
# Do the migrations
for origin, vmids in migrations.items():
self.do_migration(origin, target, vmids)
return
def migrate_vms_away(self, node=None, target=None, strategy=None):
self.log.info(f'Initiating migration of guests from {node}.')
if target and strategy:
raise Exception('target and strategy are mutually exclusive')
if target:
self.migrate_vms(node, target)
return
if strategy == 'distribute':
self.distribute_vms(node)
return
raise Exception('migration command not understood')
def fix_node_services(self, target):
self.log.info(f'Checking for failed services on {target}.')
services = self.get(f'nodes/{target}/services')
failed = [service for service in services
if service['state'] == 'failed']
if not failed:
self.log.info('No failed services found.')
return
for service in failed:
name = service['name']
self.log.info(f'Found failed service {name}, restarting.')
self.post(f'nodes/{target}/services/{name}/start')
def fix_cluster_services(self):
for node in self.all_nodes:
self.fix_node_services(node)
def reboot_node(self, target):
self.log.info(f'Initiating reboot of node {target}.')
if target == self.connected_node:
# Switch connection to another node so we don't lose it
self.log.debug(f'API connected to {target}, switching nodes.')
for index, node in enumerate(self.all_nodes):
if node == target:
potential_index = index - 1
if potential_index < 0:
potential_index = len(self.all_nodes) - 1
self.init_connection(self.all_nodes[potential_index])
self.log.moreinfo('Setting noout flag on Ceph')
self.ceph.set_noout()
self.log.moreinfo('Issuing reboot command.')
self.post(f'nodes/{target}/status', command='reboot')
# Give the node some time to actually go down
time.sleep(10)
# Wait for node to come back online before moving on
self.log.info(f'Waiting for {target} to come back online.')
elapsed = 0
while True:
time.sleep(self.poll_interval)
elapsed += self.poll_interval
nodes_status = {item['node']: item['status']
for item in self.get('nodes')}
if nodes_status[target] == 'online':
self.log.info(f'{target} is back online after {elapsed}s.')
break
if elapsed >= self.reboot_wait:
raise Exception(f"Node {target} hasn't come online "
f"after {elapsed}s, giving up.")
self.log.info('Waiting for Ceph to normalise.')
self.ceph.unset_noout()
self.log.info('noout flag unset on Ceph')
self.log.info(f'Reboot of {target} done.')
def reboot_cluster(self):
self.log.info('Initiating cluster reboot.')
for index, node in enumerate(self.all_nodes):
if index == 0:
self.migrate_vms_away(node, strategy='distribute')
else:
self.migrate_vms_away(node, target=self.all_nodes[index-1])
self.reboot_node(node)
# the value of node will be the last one from the loop
self.rebalance_vms(node)
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='proxmanager',
description='Perform administrative actions on a proxmox cluster')
parser.add_argument('cluster')
verbosity = parser.add_mutually_exclusive_group()
verbosity.add_argument('--verbose', '-v', action='count', default=0,
help="Be verbose. Specify twice for maximum verbosity")
verbosity.add_argument('--quiet', '-q', action='store_const',
const=-1, dest='verbose',
help="Suppress all output except exceptions")
subcommands = parser.add_subparsers(dest='command',
help='Available actions:')
reboot = subcommands.add_parser('reboot',
help='Reboot the target cluster')
reboot_node = subcommands.add_parser('reboot-node',
help='Reboot a specified node in target cluster')
reboot_node.add_argument('target_node', help='The node to be rebooted')
fix_services = subcommands.add_parser('fix-services',
help='Attempt to wake up failed services')
fix_services.add_argument('--node', '-n', help='Target a specific node')
args = parser.parse_args()
configfile = Path(__file__).parent / 'config.ini'
config = ConfigParser()
config.read(configfile)
log = Log(args.verbose)
p = ProxManager(args.cluster, config, log)
match args.command:
case 'reboot':
p.reboot_cluster()
case 'reboot-node':
target = args.target_node
p.migrate_vms_away(target, strategy='distribute')
p.reboot_node(target)
p.rebalance_vms(target)
case 'fix-services':
if args.node:
p.fix_node_services(args.node)
else:
p.fix_cluster_services()
case _:
raise Exception('Unknown action')