from datetime import datetime from pathlib import Path from textwrap import dedent from time import sleep import ipaddress import json import subprocess confsuffix = '.conf' serversuffix = '.serverconf' metasuffix = '.json' workdir = Path('./work') lockfile = workdir.joinpath('lockfile.lock~') def safe_join(*args) -> Path: ''' Similar to flask's own safe_join, but uses Path objects instead of strings ''' base_path = args[0] joined_path = Path.joinpath(*args) try: joined_path.relative_to(base_path) except ValueError: return None return joined_path def call_with_lock(func: callable, args: list=[], timeout: float=None): sleep_time = 0.1 slept = 0 while timeout is None or slept < timeout: try: with open(lockfile, 'x') as lf: print(f'Lock successful after {slept}s, ' f'tried on {sleep_time}s intervals.') result = func(*args) lockfile.unlink() return result except FileExistsError: slept += sleep_time sleep(sleep_time) raise TimeoutError('Unable to aquire lock within {timeout} seconds') def generate_keypair(): privkey = run_wg('genkey') pubkey = run_wg('pubkey', input=privkey) return privkey.strip(), pubkey.strip() def generate_user_serverside_config(config_id: str, user_name: str, client_ip: ipaddress, client_pubkey: str): config = dedent(f''' [Peer] # {user_name}/{config_id} PublicKey = {client_pubkey} AllowedIPs = {client_ip}/32 ''') return config.lstrip() def generate_user_clientside_config(client_ip: str, client_privkey: str, server_address: ipaddress, server_port: int, server_pubkey: str, dns_server: str): config = dedent(f''' [Interface] Address = {client_ip}/32 DNS = {dns_server} PrivateKey = {client_privkey} [Peer] AllowedIPs = 0.0.0.0/0 Endpoint = {server_address}:{server_port} PublicKey = {server_pubkey} ''') return config.lstrip() def run_wg(*args, input: str=None): result = subprocess.run(['wg', *args], input=input, capture_output=True, text=True) return result.stdout class WireGuard: def __init__(self, config: dict): self.tunnel_id = config['tunnel_id'] self.server_address = ipaddress.ip_address(config['server_address']) self.server_port = int(config['server_port']) self.dns_server = ipaddress.ip_address(config['dns_server']) self.vpn_network = ipaddress.ip_network(config['vpn_network']) self.vpn_address = ipaddress.ip_address(config['server_vpn_address']) self.configs_base = Path(config['configs_base']) self.server_config_base = None if 'server_extra_config' in config.keys(): self.server_config_base = Path(config['server_extra_config']) self.server_config_file = safe_join(workdir, self.tunnel_id + '.conf') with open(config['server_privkey_file'], 'r') as privkey_file: self.server_privkey = privkey_file.read().strip() with open(config['server_pubkey_file'], 'r') as pubkey_file: self.server_pubkey = pubkey_file.read().strip() self.wg_updated = False self.user_name = None self.user_base = None def set_user(self, user_name: str) -> None: user_base = safe_join(self.configs_base, user_name) if not user_base.exists(): user_base.mkdir() self.user_name = user_name self.user_base = user_base def get_used_ips(self): latest_config = self.generate_server_config() ips = [self.vpn_address] for line in latest_config.split('\n'): if line.startswith('AllowedIPs'): ipstring = line.split('=')[1].strip().removesuffix('/32') ips.append(ipaddress.ip_address(ipstring)) return ips def get_free_ip(self): used_ips = self.get_used_ips() for addr in self.vpn_network.hosts(): if addr not in used_ips: return addr raise Exception('No addresses available') def filepath(self, config_filename: str) -> Path: if self.user_base is None: raise Exception('Not properly initialized') return safe_join(self.user_base, config_filename) def config_filepath(self, config_id: str) -> Path: return self.filepath(f'{config_id}{confsuffix}') def serverconfig_filepath(self, config_id: str) -> Path: return self.filepath(f'{config_id}{serversuffix}') def meta_filepath(self, config_id: str) -> Path: return self.filepath(f'{config_id}{metasuffix}') def generate_server_config(self): server_config = dedent(f''' [Interface] Address = {self.vpn_address}/{self.vpn_network.prefixlen} ListenPort = {self.server_port} PrivateKey = {self.server_privkey} ''').lstrip() if self.server_config_base: with open(self.server_config_base, 'r') as cb: server_config += cb.read() for conffile in self.configs_base.glob('*/*'+serversuffix): with open(conffile, 'r') as cf: server_config += '\n' + cf.read() return server_config def list_configs(self) -> list: if self.user_base is None: raise Exception('Not properly initialized') return [p.stem for p in self.user_base.glob(f'*{confsuffix}')] def generate_config_files(self, *args) -> None: call_with_lock(self._unsafe_generate_config_files, args, 10) def _unsafe_generate_config_files(self, config_id: str, name: str, description: str, creation_time: datetime) -> None: client_privkey, client_pubkey = generate_keypair() client_ip = self.get_free_ip() with open(self.config_filepath(config_id), 'x') as cf, \ open(self.serverconfig_filepath(config_id), 'x') as sf, \ open(self.meta_filepath(config_id), 'x') as mf: metadata = {'name': name, 'description': description, 'created': creation_time.isoformat(' ', 'minutes')} mf.write(json.dumps(metadata)) cf.write(generate_user_clientside_config(client_ip, client_privkey, self.server_address, self.server_port, self.server_pubkey, self.dns_server)) sf.write(generate_user_serverside_config(config_id, self.user_name, client_ip, client_pubkey)) self.wg_updated = True def update_config(self, config_id: str, name: str, description: str) -> None: with open(self.meta_filepath(config_id), 'r+') as mf: metadata = json.loads(mf.read()) metadata['name'] = name metadata['description'] = description mf.seek(0) mf.write(json.dumps(metadata)) mf.truncate() def get_config(self, config_id: str) -> dict: with open(self.config_filepath(config_id), 'r') as cf, \ open(self.meta_filepath(config_id), 'r') as mf: metadata = json.loads(mf.read()) configdata = cf.read() return {'id': config_id, 'name': metadata['name'], 'description': metadata['description'], 'created': metadata['created'], 'data': configdata} def delete_config(self, config_id: str) -> None: paths = [self.config_filepath(config_id), self.serverconfig_filepath(config_id), self.meta_filepath(config_id)] for path in paths: if not path.exists(): raise FileNotFoundError(path) [path.unlink() for path in paths] self.wg_updated = True def update(self) -> None: if not self.wg_updated: return with open(self.server_config_file, 'w') as sf: sf.write(self.generate_server_config()) # Sync updated settings to interface subprocess.run(['sudo', 'systemctl', 'reload', f'wg-quick@{self.tunnel_id}.service']) return