253 lines
9.1 KiB
Python
253 lines
9.1 KiB
Python
from datetime import datetime
|
|
from pathlib import Path
|
|
from textwrap import dedent
|
|
from time import sleep
|
|
import ipaddress
|
|
import json
|
|
import subprocess
|
|
|
|
|
|
confsuffix = '.conf'
|
|
serversuffix = '.serverconf'
|
|
metasuffix = '.json'
|
|
|
|
workdir = Path('./work')
|
|
lockfile = workdir.joinpath('lockfile.lock~')
|
|
|
|
|
|
def safe_join(*args) -> Path:
|
|
'''
|
|
Similar to flask's own safe_join, but uses Path objects
|
|
instead of strings
|
|
'''
|
|
base_path = args[0]
|
|
joined_path = Path.joinpath(*args)
|
|
try:
|
|
joined_path.relative_to(base_path)
|
|
except ValueError:
|
|
return None
|
|
return joined_path
|
|
|
|
def call_with_lock(func: callable, args: list=[], timeout: float=None):
|
|
sleep_time = 0.1
|
|
slept = 0
|
|
while timeout is None or slept < timeout:
|
|
try:
|
|
with open(lockfile, 'x') as lf:
|
|
print(f'Lock successful after {slept}s, '
|
|
f'tried on {sleep_time}s intervals.')
|
|
result = func(*args)
|
|
lockfile.unlink()
|
|
return result
|
|
except FileExistsError:
|
|
slept += sleep_time
|
|
sleep(sleep_time)
|
|
raise TimeoutError('Unable to aquire lock within {timeout} seconds')
|
|
|
|
def generate_keypair():
|
|
privkey = run_wg('genkey')
|
|
pubkey = run_wg('pubkey', input=privkey)
|
|
return privkey.strip(), pubkey.strip()
|
|
|
|
def generate_user_serverside_config(config_id: str,
|
|
user_name: str,
|
|
client_ip: ipaddress,
|
|
client_pubkey: str):
|
|
config = dedent(f'''
|
|
[Peer]
|
|
# {user_name}/{config_id}
|
|
PublicKey = {client_pubkey}
|
|
AllowedIPs = {client_ip}/32
|
|
''')
|
|
return config.lstrip()
|
|
|
|
def generate_user_clientside_config(client_ip: str,
|
|
client_privkey: str,
|
|
server_address: ipaddress,
|
|
server_port: int,
|
|
server_pubkey: str,
|
|
dns_server: str):
|
|
config = dedent(f'''
|
|
[Interface]
|
|
Address = {client_ip}/32
|
|
DNS = {dns_server}
|
|
PrivateKey = {client_privkey}
|
|
|
|
[Peer]
|
|
AllowedIPs = 0.0.0.0/0
|
|
Endpoint = {server_address}:{server_port}
|
|
PublicKey = {server_pubkey}
|
|
''')
|
|
return config.lstrip()
|
|
|
|
def run_wg(*args, input: str=None):
|
|
result = subprocess.run(['wg', *args],
|
|
input=input,
|
|
capture_output=True,
|
|
text=True)
|
|
return result.stdout
|
|
|
|
|
|
class WireGuard:
|
|
def __init__(self, config: dict):
|
|
self.tunnel_id = config['tunnel_id']
|
|
self.server_address = ipaddress.ip_address(config['server_address'])
|
|
self.server_port = int(config['server_port'])
|
|
self.dns_server = ipaddress.ip_address(config['dns_server'])
|
|
self.vpn_network = ipaddress.ip_network(config['vpn_network'])
|
|
self.vpn_address = ipaddress.ip_address(config['server_vpn_address'])
|
|
self.configs_base = Path(config['configs_base'])
|
|
|
|
self.server_config_base = None
|
|
if 'server_extra_config' in config.keys():
|
|
self.server_config_base = Path(config['server_extra_config'])
|
|
|
|
self.server_config_file = safe_join(workdir, self.tunnel_id + '.conf')
|
|
|
|
with open(config['server_privkey_file'], 'r') as privkey_file:
|
|
self.server_privkey = privkey_file.read().strip()
|
|
with open(config['server_pubkey_file'], 'r') as pubkey_file:
|
|
self.server_pubkey = pubkey_file.read().strip()
|
|
|
|
self.wg_updated = False
|
|
|
|
self.user_name = None
|
|
self.user_base = None
|
|
|
|
def set_user(self, user_name: str) -> None:
|
|
user_base = safe_join(self.configs_base, user_name)
|
|
if not user_base.exists():
|
|
user_base.mkdir()
|
|
self.user_name = user_name
|
|
self.user_base = user_base
|
|
|
|
def get_used_ips(self):
|
|
latest_config = self.generate_server_config()
|
|
ips = [self.vpn_address]
|
|
for line in latest_config.split('\n'):
|
|
if line.startswith('AllowedIPs'):
|
|
ipstring = line.split('=')[1].strip().removesuffix('/32')
|
|
ips.append(ipaddress.ip_address(ipstring))
|
|
return ips
|
|
|
|
def get_free_ip(self):
|
|
used_ips = self.get_used_ips()
|
|
for addr in self.vpn_network.hosts():
|
|
if addr not in used_ips:
|
|
return addr
|
|
raise Exception('No addresses available')
|
|
|
|
def filepath(self, config_filename: str) -> Path:
|
|
if self.user_base is None:
|
|
raise Exception('Not properly initialized')
|
|
return safe_join(self.user_base, config_filename)
|
|
|
|
def config_filepath(self, config_id: str) -> Path:
|
|
return self.filepath(f'{config_id}{confsuffix}')
|
|
|
|
def serverconfig_filepath(self, config_id: str) -> Path:
|
|
return self.filepath(f'{config_id}{serversuffix}')
|
|
|
|
def meta_filepath(self, config_id: str) -> Path:
|
|
return self.filepath(f'{config_id}{metasuffix}')
|
|
|
|
def generate_server_config(self):
|
|
server_config = dedent(f'''
|
|
[Interface]
|
|
Address = {self.vpn_address}/{self.vpn_network.prefixlen}
|
|
ListenPort = {self.server_port}
|
|
PrivateKey = {self.server_privkey}
|
|
''').lstrip()
|
|
if self.server_config_base:
|
|
with open(self.server_config_base, 'r') as cb:
|
|
server_config += cb.read()
|
|
for conffile in self.configs_base.glob('*/*'+serversuffix):
|
|
with open(conffile, 'r') as cf:
|
|
server_config += '\n' + cf.read()
|
|
return server_config
|
|
|
|
def list_configs(self) -> list:
|
|
if self.user_base is None:
|
|
raise Exception('Not properly initialized')
|
|
return [p.stem for p
|
|
in self.user_base.glob(f'*{confsuffix}')]
|
|
|
|
def generate_config_files(self, *args) -> None:
|
|
call_with_lock(self._unsafe_generate_config_files, args, 10)
|
|
|
|
def _unsafe_generate_config_files(self,
|
|
config_id: str,
|
|
name: str,
|
|
description: str,
|
|
creation_time: datetime) -> None:
|
|
client_privkey, client_pubkey = generate_keypair()
|
|
client_ip = self.get_free_ip()
|
|
with open(self.config_filepath(config_id), 'x') as cf, \
|
|
open(self.serverconfig_filepath(config_id), 'x') as sf, \
|
|
open(self.meta_filepath(config_id), 'x') as mf:
|
|
|
|
metadata = {'name': name,
|
|
'description': description,
|
|
'created': creation_time.isoformat(' ',
|
|
'minutes')}
|
|
mf.write(json.dumps(metadata))
|
|
|
|
cf.write(generate_user_clientside_config(client_ip,
|
|
client_privkey,
|
|
self.server_address,
|
|
self.server_port,
|
|
self.server_pubkey,
|
|
self.dns_server))
|
|
|
|
sf.write(generate_user_serverside_config(config_id,
|
|
self.user_name,
|
|
client_ip,
|
|
client_pubkey))
|
|
self.wg_updated = True
|
|
|
|
def update_config(self,
|
|
config_id: str,
|
|
name: str,
|
|
description: str) -> None:
|
|
with open(self.meta_filepath(config_id), 'r+') as mf:
|
|
metadata = json.loads(mf.read())
|
|
metadata['name'] = name
|
|
metadata['description'] = description
|
|
mf.seek(0)
|
|
mf.write(json.dumps(metadata))
|
|
mf.truncate()
|
|
|
|
def get_config(self, config_id: str) -> dict:
|
|
with open(self.config_filepath(config_id), 'r') as cf, \
|
|
open(self.meta_filepath(config_id), 'r') as mf:
|
|
metadata = json.loads(mf.read())
|
|
configdata = cf.read()
|
|
return {'id': config_id,
|
|
'name': metadata['name'],
|
|
'description': metadata['description'],
|
|
'created': metadata['created'],
|
|
'data': configdata}
|
|
|
|
def delete_config(self, config_id: str) -> None:
|
|
paths = [self.config_filepath(config_id),
|
|
self.serverconfig_filepath(config_id),
|
|
self.meta_filepath(config_id)]
|
|
for path in paths:
|
|
if not path.exists():
|
|
raise FileNotFoundError(path)
|
|
[path.unlink() for path in paths]
|
|
self.wg_updated = True
|
|
|
|
def update(self) -> None:
|
|
if not self.wg_updated:
|
|
return
|
|
with open(self.server_config_file, 'w') as sf:
|
|
sf.write(self.generate_server_config())
|
|
|
|
# Sync updated settings to interface
|
|
subprocess.run(['sudo',
|
|
'systemctl',
|
|
'reload',
|
|
f'wg-quick@{self.tunnel_id}.service'])
|
|
return
|