from base64 import b64encode from configparser import ConfigParser from datetime import datetime from pathlib import Path import json import re from flask import jsonify, Flask, redirect, request, Response from .exceptions import ClientLimitError from .oauth import Oauth from .wireguard import WireGuard login_path = '/login' callback_path = '/auth' cron_path = '/cron' public_paths = [login_path, callback_path, cron_path] user_cookie = 'username' token_cookie = 'token' access_cookie = 'access' permitted_format = re.compile('^[A-Za-z0-9-]+$') config = ConfigParser() config.read('./config.ini') app = Flask('wg-selfserve') oauth = Oauth(config['oauth']) app.wg = WireGuard(config['wireguard']) def prepare_frontend_settings(config): settings = {} for key, value in config['frontend'].items(): settings[key] = value settings['client_limit'] = config.getint('wireguard', 'user_client_limit', fallback=0) json_string = json.dumps(settings) base64_bytes = b64encode(bytes(json_string, 'utf-8')) return base64_bytes.decode('utf-8') frontend_settings = prepare_frontend_settings(config) required_entitlement_list = [] if 'required_entitlement' in config['security']: entitlement_conf = config.get('security', 'required_entitlement') required_entitlement_list = [e.strip() for e in entitlement_conf.split(',')] def check_access(user_entitlements): if not required_entitlement_list: return True for e in required_entitlement_list: if e in user_entitlements: return True return False def fail(message: str) -> Response: response = jsonify({'result': 'failed', 'reason': message}) response.status = 400 return response @app.before_request def setup() -> None: if request.path in public_paths: return token = request.cookies.get(token_cookie) user_info = oauth.authorize(token) if not user_info or not user_info['active']: return Response(status=403) if not check_access(user_info['entitlements']): response = Response(status=403) response.set_cookie(access_cookie, 'denied', secure=True, samesite='Strict') return response remote_user = user_info['sub'] app.wg.set_user(remote_user) @app.after_request def reload(response: Response) -> Response: response.set_cookie('server_settings', frontend_settings, secure=True, samesite='Strict') if app.wg.user_name: response.set_cookie(user_cookie, app.wg.user_name, secure=True, samesite='Strict') app.wg.update() return response @app.route(login_path) def login(): response = redirect(oauth.auth_url) response.set_cookie('return', request.args.get('return'), samesite='Lax') return response @app.route(callback_path) def authorize(): token = oauth.request_access_token(request.args.get('code')) return_path = request.cookies.get('return') if not return_path: return_path = '/' response = redirect(return_path) response.set_cookie(token_cookie, token, secure=True, httponly=True, samesite='Strict') return response @app.route(cron_path) def run_cron(): app.wg.run_cleanup() return Response() @app.route('/configs/') def list_configs() -> list: return app.wg.list_configs() @app.route('/configs/') def get_config(config_id: str) -> dict: try: return app.wg.get_config(config_id) except FileNotFoundError: return fail('Config id not found') @app.route('/configs//create', methods=['POST']) def create_config(config_id: str) -> dict: data = request.get_json() name = data['name'] description = data['description'] creation_time = datetime.now() if not permitted_format.match(config_id): return fail('Invalid config id') if not name: return fail('Name is mandatory') try: app.wg.generate_config_files(config_id, name, description, creation_time) except FileExistsError: return fail('Id already in use') except ClientLimitError as e: return fail(e.message) return {'result': 'success'} @app.route('/configs//update', methods=['POST']) def update_config(config_id: str) -> dict: data = request.get_json() name = data['name'] description = data['description'] try: app.wg.update_config(config_id, name, description) except FileNotFoundError: return fail('Config id not found') return {'result': 'success'} @app.route('/configs//delete', methods=['POST', 'DELETE']) def delete_config(config_id: str) -> dict: try: app.wg.delete_config(config_id) except FileNotFoundError: return fail('Config id not found') return {'result': 'success'}