wg-selfserve/api/wireguard.py
2025-02-20 18:07:00 +01:00

253 lines
9.1 KiB
Python

from datetime import datetime
from pathlib import Path
from textwrap import dedent
from time import sleep
import ipaddress
import json
import subprocess
confsuffix = '.conf'
serversuffix = '.serverconf'
metasuffix = '.json'
workdir = Path('./work')
lockfile = workdir.joinpath('lockfile.lock~')
def safe_join(*args) -> Path:
'''
Similar to flask's own safe_join, but uses Path objects
instead of strings
'''
base_path = args[0]
joined_path = Path.joinpath(*args)
try:
joined_path.relative_to(base_path)
except ValueError:
return None
return joined_path
def call_with_lock(func: callable, args: list=[], timeout: float=None):
sleep_time = 0.1
slept = 0
while timeout is None or slept < timeout:
try:
with open(lockfile, 'x') as lf:
print(f'Lock successful after {slept}s, '
f'tried on {sleep_time}s intervals.')
result = func(*args)
lockfile.unlink()
return result
except FileExistsError:
slept += sleep_time
sleep(sleep_time)
raise TimeoutError('Unable to aquire lock within {timeout} seconds')
def generate_keypair():
privkey = run_wg('genkey')
pubkey = run_wg('pubkey', input=privkey)
return privkey.strip(), pubkey.strip()
def generate_user_serverside_config(config_id: str,
user_name: str,
client_ip: ipaddress,
client_pubkey: str):
config = dedent(f'''
[Peer]
# {user_name}/{config_id}
PublicKey = {client_pubkey}
AllowedIPs = {client_ip}/32
''')
return config.lstrip()
def generate_user_clientside_config(client_ip: str,
client_privkey: str,
server_address: ipaddress,
server_port: int,
server_pubkey: str,
dns_server: str):
config = dedent(f'''
[Interface]
Address = {client_ip}/32
DNS = {dns_server}
PrivateKey = {client_privkey}
[Peer]
AllowedIPs = 0.0.0.0/0
Endpoint = {server_address}:{server_port}
PublicKey = {server_pubkey}
''')
return config.lstrip()
def run_wg(*args, input: str=None):
result = subprocess.run(['wg', *args],
input=input,
capture_output=True,
text=True)
return result.stdout
class WireGuard:
def __init__(self, config: dict):
self.tunnel_id = config['tunnel_id']
self.server_address = ipaddress.ip_address(config['server_address'])
self.server_port = int(config['server_port'])
self.dns_server = ipaddress.ip_address(config['dns_server'])
self.vpn_network = ipaddress.ip_network(config['vpn_network'])
self.vpn_address = ipaddress.ip_address(config['server_vpn_address'])
self.configs_base = Path(config['configs_base'])
self.server_config_base = None
if 'server_extra_config' in config.keys():
self.server_config_base = Path(config['server_extra_config'])
self.server_config_file = safe_join(workdir, self.tunnel_id + '.conf')
with open(config['server_privkey_file'], 'r') as privkey_file:
self.server_privkey = privkey_file.read().strip()
with open(config['server_pubkey_file'], 'r') as pubkey_file:
self.server_pubkey = pubkey_file.read().strip()
self.wg_updated = False
self.user_name = None
self.user_base = None
def set_user(self, user_name: str) -> None:
user_base = safe_join(self.configs_base, user_name)
if not user_base.exists():
user_base.mkdir()
self.user_name = user_name
self.user_base = user_base
def get_used_ips(self):
latest_config = self.generate_server_config()
ips = [self.vpn_address]
for line in latest_config.split('\n'):
if line.startswith('AllowedIPs'):
ipstring = line.split('=')[1].strip().removesuffix('/32')
ips.append(ipaddress.ip_address(ipstring))
return ips
def get_free_ip(self):
used_ips = self.get_used_ips()
for addr in self.vpn_network.hosts():
if addr not in used_ips:
return addr
raise Exception('No addresses available')
def filepath(self, config_filename: str) -> Path:
if self.user_base is None:
raise Exception('Not properly initialized')
return safe_join(self.user_base, config_filename)
def config_filepath(self, config_id: str) -> Path:
return self.filepath(f'{config_id}{confsuffix}')
def serverconfig_filepath(self, config_id: str) -> Path:
return self.filepath(f'{config_id}{serversuffix}')
def meta_filepath(self, config_id: str) -> Path:
return self.filepath(f'{config_id}{metasuffix}')
def generate_server_config(self):
server_config = dedent(f'''
[Interface]
Address = {self.vpn_address}/{self.vpn_network.prefixlen}
ListenPort = {self.server_port}
PrivateKey = {self.server_privkey}
''').lstrip()
if self.server_config_base:
with open(self.server_config_base, 'r') as cb:
server_config += cb.read()
for conffile in self.configs_base.glob('*/*'+serversuffix):
with open(conffile, 'r') as cf:
server_config += '\n' + cf.read()
return server_config
def list_configs(self) -> list:
if self.user_base is None:
raise Exception('Not properly initialized')
return [p.stem for p
in self.user_base.glob(f'*{confsuffix}')]
def generate_config_files(self, *args) -> None:
call_with_lock(self._unsafe_generate_config_files, args, 10)
def _unsafe_generate_config_files(self,
config_id: str,
name: str,
description: str,
creation_time: datetime) -> None:
client_privkey, client_pubkey = generate_keypair()
client_ip = self.get_free_ip()
with open(self.config_filepath(config_id), 'x') as cf, \
open(self.serverconfig_filepath(config_id), 'x') as sf, \
open(self.meta_filepath(config_id), 'x') as mf:
metadata = {'name': name,
'description': description,
'created': creation_time.isoformat(' ',
'minutes')}
mf.write(json.dumps(metadata))
cf.write(generate_user_clientside_config(client_ip,
client_privkey,
self.server_address,
self.server_port,
self.server_pubkey,
self.dns_server))
sf.write(generate_user_serverside_config(config_id,
self.user_name,
client_ip,
client_pubkey))
self.wg_updated = True
def update_config(self,
config_id: str,
name: str,
description: str) -> None:
with open(self.meta_filepath(config_id), 'r+') as mf:
metadata = json.loads(mf.read())
metadata['name'] = name
metadata['description'] = description
mf.seek(0)
mf.write(json.dumps(metadata))
mf.truncate()
def get_config(self, config_id: str) -> dict:
with open(self.config_filepath(config_id), 'r') as cf, \
open(self.meta_filepath(config_id), 'r') as mf:
metadata = json.loads(mf.read())
configdata = cf.read()
return {'id': config_id,
'name': metadata['name'],
'description': metadata['description'],
'created': metadata['created'],
'data': configdata}
def delete_config(self, config_id: str) -> None:
paths = [self.config_filepath(config_id),
self.serverconfig_filepath(config_id),
self.meta_filepath(config_id)]
for path in paths:
if not path.exists():
raise FileNotFoundError(path)
[path.unlink() for path in paths]
self.wg_updated = True
def update(self) -> None:
if not self.wg_updated:
return
with open(self.server_config_file, 'w') as sf:
sf.write(self.generate_server_config())
# Sync updated settings to interface
subprocess.run(['sudo',
'systemctl',
'reload',
f'wg-quick@{self.tunnel_id}.service'])
return