Also removed unnecessary declarations of instance variables on ProxManager, only keeping the ones that aren't set directly in __init__ around for greater visibility.
514 lines
19 KiB
Python
Executable File
514 lines
19 KiB
Python
Executable File
#!/opt/proxmanager/env/bin/python3
|
|
|
|
import argparse
|
|
import time
|
|
|
|
from configparser import ConfigParser
|
|
from pathlib import Path
|
|
|
|
from proxmoxer import ProxmoxAPI
|
|
from proxmoxer.tools import Tasks
|
|
|
|
|
|
def running(vm):
|
|
if vm['status'] == 'running':
|
|
return True
|
|
return False
|
|
|
|
|
|
class Log:
|
|
_verbosity = None
|
|
_continue = False
|
|
|
|
def __init__(self, verbosity):
|
|
self._verbosity = verbosity
|
|
|
|
def log(self, level, message, lf=True):
|
|
if self._verbosity < level:
|
|
return
|
|
|
|
kwargs = {'flush': True}
|
|
if lf == False:
|
|
self._continue = True
|
|
kwargs['end'] = ''
|
|
else:
|
|
self._continue = False
|
|
|
|
pad = ''
|
|
if not self._continue:
|
|
pad = ' ' * level
|
|
print(f'{pad}{message}', **kwargs)
|
|
|
|
def info(self, message, **kwargs):
|
|
self.log(0, message, **kwargs)
|
|
|
|
def moreinfo(self, message, **kwargs):
|
|
self.log(1, message, **kwargs)
|
|
|
|
def debug(self, message, **kwargs):
|
|
self.log(2, message, **kwargs)
|
|
|
|
|
|
class Ceph:
|
|
manager = None
|
|
log = None
|
|
noout_path = None
|
|
|
|
def __init__(self, proxmanager, log):
|
|
self.manager = proxmanager
|
|
self.log = log
|
|
self.noout_path = 'cluster/ceph/flags/noout'
|
|
|
|
def get_noout(self):
|
|
return self.manager.get(self.noout_path)
|
|
|
|
def is_healthy(self):
|
|
status = self.manager.get('cluster/ceph/status')
|
|
if status['health']['status'] == 'HEALTH_OK':
|
|
return True
|
|
|
|
warnings = {check: value for check, value
|
|
in status['health']['checks'].items()
|
|
if check != 'OSDMAP_FLAGS'}
|
|
if warnings:
|
|
self.log.debug('Ceph reports the following warnings:')
|
|
for check, value in warnings.items():
|
|
message = value['summary']['message']
|
|
self.log.debug(f'{check}: {message}')
|
|
return False
|
|
|
|
flags = [flag['name']
|
|
for flag in self.manager.get('cluster/ceph/flags')
|
|
if flag['name'] != 'noout'
|
|
and flag['value'] == 1]
|
|
if flags:
|
|
self.log.debug('The following Ceph flags are set:')
|
|
for flag in flags:
|
|
self.log.debug(flag)
|
|
return False
|
|
|
|
return True
|
|
|
|
def set_noout(self):
|
|
if self.get_noout():
|
|
return
|
|
|
|
if not self.is_healthy():
|
|
raise Exception("Ceph reports unhealthy state, "
|
|
"won't set noout under these conditions.")
|
|
self.manager.put(self.noout_path, value=1)
|
|
return
|
|
|
|
def unset_noout(self):
|
|
if not self.get_noout():
|
|
return
|
|
|
|
if not self.is_healthy():
|
|
waited = 0
|
|
self.log.moreinfo('Ceph reports unhealthy state, '
|
|
'waiting to see if it resolves itself.')
|
|
while True:
|
|
time.sleep(self.manager.poll_interval)
|
|
waited += self.manager.poll_interval
|
|
if self.is_healthy():
|
|
self.log.moreinfo('Ceph health has recovered '
|
|
f'after {waited}s.')
|
|
break
|
|
if waited >= self.manager.ceph_resolve_wait:
|
|
raise Exception('Ceph health still degraded '
|
|
f'after {waited}s. Giving up.')
|
|
|
|
self.manager.put(self.noout_path, value=0)
|
|
return
|
|
|
|
|
|
class ProxManager:
|
|
connected_node = None
|
|
all_nodes = None
|
|
|
|
def __init__(self, cluster, config, log):
|
|
if f'auth.{cluster}' not in config:
|
|
raise Exception('No known authentication methods '
|
|
f'for cluster {cluster}')
|
|
self.conf = config
|
|
self.log = log
|
|
self.cluster = cluster
|
|
self.max_workers = self.conf.get('connection',
|
|
'max workers',
|
|
fallback=5)
|
|
self.poll_interval = self.conf.get('connection',
|
|
'poll interval',
|
|
fallback=10)
|
|
self.reboot_wait = self.conf.get('connection',
|
|
'node reboot timeout',
|
|
fallback=400)
|
|
self.ceph_resolve_wait = self.conf.get('connection',
|
|
'ceph resolve timeout',
|
|
fallback=200)
|
|
self.init_connection(cluster)
|
|
self.ceph = Ceph(self, log)
|
|
|
|
def init_connection(self, target):
|
|
self.log.info(f'Establishing API connection to {target}.')
|
|
self.papi = ProxmoxAPI(target,
|
|
user=self.conf.get(f'auth.{self.cluster}',
|
|
'user name'),
|
|
token_name=self.conf.get(f'auth.{self.cluster}',
|
|
'token name'),
|
|
token_value=self.conf.get(f'auth.{self.cluster}',
|
|
'token value'),
|
|
timeout=self.conf.getint('connection',
|
|
'http_timeout',
|
|
fallback=300),
|
|
verify_ssl=False)
|
|
|
|
nodes = [item for item in self.get('cluster/status')
|
|
if item['type'] == 'node']
|
|
for node in nodes:
|
|
if node['local'] == 1:
|
|
self.connected_node = node['name']
|
|
self.log.moreinfo(f'Connected to {self.connected_node}.')
|
|
break
|
|
else:
|
|
raise Exception("Can't find connected node! "
|
|
"I can't work like this!")
|
|
if not self.all_nodes:
|
|
self.all_nodes = [item['name'] for item in nodes]
|
|
return
|
|
|
|
def make_request(self, verb, path, **kwargs):
|
|
path = self.papi(path)
|
|
result = getattr(path, verb)(**kwargs)
|
|
if isinstance(result, str) and result.startswith('UPID:'):
|
|
return Tasks.blocking_status(self.papi, result)
|
|
return result
|
|
|
|
def get(self, path, **kwargs):
|
|
return self.make_request('get', path, **kwargs)
|
|
|
|
def post(self, path, **kwargs):
|
|
return self.make_request('post', path, **kwargs)
|
|
|
|
def put(self, path, **kwargs):
|
|
return self.make_request('put', path, **kwargs)
|
|
|
|
def do_migration(self, origin, target, vmids):
|
|
if not vmids:
|
|
return
|
|
self.log.info(f'Doing migrations from {origin} to {target}...')
|
|
|
|
vmids_str = ','.join(map(str, vmids))
|
|
self.log.moreinfo(f'Migrating vmids: {vmids_str}')
|
|
self.post(f'nodes/{origin}/migrateall',
|
|
target=target,
|
|
maxworkers=self.max_workers,
|
|
vms=vmids_str)
|
|
self.log.info('done!')
|
|
|
|
def get_running_vms(self, node):
|
|
return [vm for vm in self.get(f'nodes/{node}/qemu')
|
|
if running(vm)]
|
|
|
|
def collect_cluster_status(self):
|
|
self.log.debug('Collecting cluster status information.')
|
|
available_resources = {}
|
|
for node in self.all_nodes:
|
|
status = self.get(f'nodes/{node}/status')
|
|
cpu_count = status['cpuinfo']['cpus']
|
|
cpus_free = cpu_count
|
|
memory_total = status['memory']['total']
|
|
# Subtracting ~150MB as a safety buffer of reserved memory
|
|
# Based on observed difference between reported free memory
|
|
# across pvesh and free
|
|
memory_free = memory_total - 150000000
|
|
|
|
vms = self.get_running_vms(node)
|
|
for vm in vms:
|
|
cpus_free -= vm['cpus']
|
|
memory_free -= vm['maxmem']
|
|
|
|
self.log.debug(f'{node}: {cpus_free} unused cores, '
|
|
f'{memory_free} free memory')
|
|
available_resources[node] = {'memory': memory_free,
|
|
'cpu': cpus_free}
|
|
return available_resources
|
|
|
|
def find_best_node(self, nodeinfo, vm):
|
|
# Checks for sufficient free memory and cores, and chooses the node
|
|
# with most available memory from the ones eligible
|
|
|
|
required_cores = vm['cpus']
|
|
required_memory = vm['maxmem']
|
|
name = vm['name']
|
|
vmid = vm['vmid']
|
|
|
|
self.log.debug('Looking for best migration target '
|
|
f'for guest {name}({vmid}).')
|
|
|
|
best_fit = None
|
|
for node, stats in nodeinfo.items():
|
|
cpu = stats['cpu']
|
|
memory = stats['memory']
|
|
if cpu < required_cores:
|
|
self.log.debug(f'{node} unsuitable: {cpu} free cores '
|
|
f'but {required_cores} needed')
|
|
continue
|
|
if memory < required_memory:
|
|
self.log.debug(f'{node} unsuitable: {memory} free memory '
|
|
f'but {required_memory} needed')
|
|
continue
|
|
if not best_fit:
|
|
best_fit = node
|
|
self.log.debug(f'{node} selected by default')
|
|
continue
|
|
if memory > nodeinfo[best_fit]['memory']:
|
|
self.log.debug(f'{node} is now best fit')
|
|
best_fit = node
|
|
|
|
if not best_fit:
|
|
raise Exception('No node with sufficient capacity available!')
|
|
|
|
self.log.debug(f'{best_fit} chosen for migration.')
|
|
return best_fit
|
|
|
|
def migrate_vms(self, origin, target):
|
|
self.log.info(f'Initiating migrations from {origin} to {target}.')
|
|
|
|
migrations = []
|
|
vms = self.get_running_vms(origin)
|
|
for vm in vms:
|
|
name = vm['name']
|
|
vmid = vm['vmid']
|
|
self.log.debug(f'Queueing guest {name}({vmid}) '
|
|
f'for migration to {target}.')
|
|
migrations.append(vmid)
|
|
|
|
self.do_migration(origin, target, migrations)
|
|
|
|
def distribute_vms(self, origin):
|
|
self.log.info(f'Distributing guests away from {origin}.')
|
|
target_nodes = [item for item in self.all_nodes if item != origin]
|
|
|
|
available_resources = self.collect_cluster_status()
|
|
|
|
del available_resources[origin]
|
|
migrations = {item:[] for item in target_nodes}
|
|
|
|
vms = self.get_running_vms(origin)
|
|
for vm in vms:
|
|
name = vm['name']
|
|
vmid = vm['vmid']
|
|
cores = vm['cpus']
|
|
memory = vm['maxmem']
|
|
target = self.find_best_node(available_resources, vm)
|
|
self.log.debug(f'Queueing guest {name}({vmid}) '
|
|
f'for migration to {target}.')
|
|
migrations[target].append(vmid)
|
|
available_resources[target]['memory'] -= memory
|
|
available_resources[target]['cpu'] -= cores
|
|
|
|
for target, vmids in migrations.items():
|
|
self.do_migration(origin, target, vmids)
|
|
return
|
|
|
|
def rebalance_vms(self, target):
|
|
self.log.info(f'Pulling guests into {target} to balance load.')
|
|
available_resources = self.collect_cluster_status()
|
|
|
|
all_memory = [item['memory'] for item in available_resources.values()]
|
|
all_cores = [item['cpu'] for item in available_resources.values()]
|
|
|
|
avg_free_memory = sum(all_memory)/len(all_memory)
|
|
avg_free_memory_up = avg_free_memory + avg_free_memory*0.1
|
|
avg_free_memory_down = avg_free_memory - avg_free_memory*0.1
|
|
avg_free_cores = sum(all_cores)/len(all_cores)
|
|
self.log.debug(f'Average free memory across cluster: {avg_free_memory}')
|
|
self.log.debug(f'Average free cores across cluster: {avg_free_cores}')
|
|
|
|
target_free_memory = available_resources[target]['memory']
|
|
target_free_cores = available_resources[target]['cpu']
|
|
|
|
# Collect a list of all running vms not residing on the target node
|
|
# and take note of each vm's host node
|
|
all_vms = []
|
|
for node in self.all_nodes:
|
|
if node == target:
|
|
continue
|
|
vms = self.get_running_vms(node)
|
|
for vm in vms:
|
|
vm['node'] = node
|
|
all_vms.append(vm)
|
|
|
|
# Iterate over the vms in order of highest memory allocation
|
|
# first. Break the loop once target gets within 10% of the
|
|
# cluster's average free memory or fewer free cores than the
|
|
# cluster average.
|
|
all_vms = sorted(all_vms, key=lambda vm: vm['maxmem'], reverse=True)
|
|
smallest_vm = all_vms[-1]
|
|
migrations = {item: [] for item in self.all_nodes
|
|
if item != target}
|
|
for vm in all_vms:
|
|
vmid = vm['vmid']
|
|
node = vm['node']
|
|
name = vm['name']
|
|
memory = vm['maxmem']
|
|
|
|
if target_free_memory - memory < avg_free_memory_down:
|
|
# Only migrate vms that will cause memory usage up to
|
|
# 10% above cluster average
|
|
continue
|
|
|
|
self.log.debug(f'Queueing guest {name}({vmid}) '
|
|
f'for migration from {node}.')
|
|
migrations[node].append(vmid)
|
|
|
|
target_free_memory -= vm['maxmem']
|
|
target_free_cores -= vm['cpus']
|
|
if target_free_memory < avg_free_memory_up:
|
|
# Break if the migrated vm will cause target memory
|
|
# usage to come within 10% of cluster average
|
|
break
|
|
if target_free_cores < avg_free_cores:
|
|
break
|
|
|
|
# Do the migrations
|
|
for origin, vmids in migrations.items():
|
|
self.do_migration(origin, target, vmids)
|
|
return
|
|
|
|
def migrate_vms_away(self, node=None, target=None, strategy=None):
|
|
self.log.info(f'Initiating migration of guests from {node}.')
|
|
if target and strategy:
|
|
raise Exception('target and strategy are mutually exclusive')
|
|
|
|
if target:
|
|
self.migrate_vms(node, target)
|
|
return
|
|
|
|
if strategy == 'distribute':
|
|
self.distribute_vms(node)
|
|
return
|
|
|
|
raise Exception('migration command not understood')
|
|
|
|
def fix_node_services(self, target):
|
|
self.log.info(f'Checking for failed services on {target}.')
|
|
services = self.get(f'nodes/{target}/services')
|
|
failed = [service for service in services
|
|
if service['state'] == 'failed']
|
|
|
|
if not failed:
|
|
self.log.info('No failed services found.')
|
|
return
|
|
|
|
for service in failed:
|
|
name = service['name']
|
|
self.log.info(f'Found failed service {name}, restarting.')
|
|
self.post(f'nodes/{target}/services/{name}/start')
|
|
|
|
def fix_cluster_services(self):
|
|
for node in self.all_nodes:
|
|
self.fix_node_services(node)
|
|
|
|
def reboot_node(self, target):
|
|
self.log.info(f'Initiating reboot of node {target}.')
|
|
if target == self.connected_node:
|
|
# Switch connection to another node so we don't lose it
|
|
self.log.debug(f'API connected to {target}, switching nodes.')
|
|
for index, node in enumerate(self.all_nodes):
|
|
if node == target:
|
|
potential_index = index - 1
|
|
if potential_index < 0:
|
|
potential_index = len(self.all_nodes) - 1
|
|
self.init_connection(self.all_nodes[potential_index])
|
|
|
|
self.log.moreinfo('Setting noout flag on Ceph')
|
|
self.ceph.set_noout()
|
|
self.log.moreinfo('Issuing reboot command.')
|
|
self.post(f'nodes/{target}/status', command='reboot')
|
|
|
|
# Give the node some time to actually go down
|
|
time.sleep(10)
|
|
|
|
# Wait for node to come back online before moving on
|
|
self.log.info(f'Waiting for {target} to come back online.')
|
|
elapsed = 0
|
|
while True:
|
|
time.sleep(self.poll_interval)
|
|
elapsed += self.poll_interval
|
|
nodes_status = {item['node']: item['status']
|
|
for item in self.get('nodes')}
|
|
if nodes_status[target] == 'online':
|
|
self.log.info(f'{target} is back online after {elapsed}s.')
|
|
break
|
|
if elapsed >= self.reboot_wait:
|
|
raise Exception(f"Node {target} hasn't come online "
|
|
f"after {elapsed}s, giving up.")
|
|
|
|
self.log.info('Waiting for Ceph to normalise.')
|
|
self.ceph.unset_noout()
|
|
self.log.info('noout flag unset on Ceph')
|
|
self.log.info(f'Reboot of {target} done.')
|
|
|
|
def reboot_cluster(self):
|
|
self.log.info('Initiating cluster reboot.')
|
|
for index, node in enumerate(self.all_nodes):
|
|
if index == 0:
|
|
self.migrate_vms_away(node, strategy='distribute')
|
|
else:
|
|
self.migrate_vms_away(node, target=self.all_nodes[index-1])
|
|
self.reboot_node(node)
|
|
|
|
# the value of node will be the last one from the loop
|
|
self.rebalance_vms(node)
|
|
return
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
prog='proxmanager',
|
|
description='Perform administrative actions on a proxmox cluster')
|
|
parser.add_argument('cluster')
|
|
|
|
verbosity = parser.add_mutually_exclusive_group()
|
|
verbosity.add_argument('--verbose', '-v', action='count', default=0,
|
|
help="Be verbose. Specify twice for maximum verbosity")
|
|
verbosity.add_argument('--quiet', '-q', action='store_const',
|
|
const=-1, dest='verbose',
|
|
help="Suppress all output except exceptions")
|
|
|
|
subcommands = parser.add_subparsers(dest='command',
|
|
help='Available actions:')
|
|
reboot = subcommands.add_parser('reboot',
|
|
help='Reboot the target cluster')
|
|
reboot_node = subcommands.add_parser('reboot-node',
|
|
help='Reboot a specified node in target cluster')
|
|
reboot_node.add_argument('target_node', help='The node to be rebooted')
|
|
fix_services = subcommands.add_parser('fix-services',
|
|
help='Attempt to wake up failed services')
|
|
fix_services.add_argument('--node', '-n', help='Target a specific node')
|
|
|
|
args = parser.parse_args()
|
|
|
|
configfile = Path(__file__).parent / 'config.ini'
|
|
config = ConfigParser()
|
|
config.read(configfile)
|
|
log = Log(args.verbose)
|
|
p = ProxManager(args.cluster, config, log)
|
|
|
|
match args.command:
|
|
case 'reboot':
|
|
p.reboot_cluster()
|
|
case 'reboot-node':
|
|
target = args.target_node
|
|
p.migrate_vms_away(target, strategy='distribute')
|
|
p.reboot_node(target)
|
|
p.rebalance_vms(target)
|
|
case 'fix-services':
|
|
if args.node:
|
|
p.fix_node_services(args.node)
|
|
else:
|
|
p.fix_cluster_services()
|
|
case _:
|
|
raise Exception('Unknown action')
|