from configparser import ConfigParser from datetime import datetime from pathlib import Path import json from flask import Flask, redirect, request, Response from .wireguard import WireGuard from .oauth import Oauth config = ConfigParser() config.read('./config.ini') configs_base = Path(config['wireguard']['configs_base']) wireguard_id = config['wireguard']['tunnel_id'] app = Flask(__name__) oauth = Oauth(config['oauth']) app.wg = WireGuard(config['wireguard']) login_path = '/login' callback_path = '/auth' user_cookie = 'username' token_cookie = 'token' public_paths = [login_path, callback_path] @app.before_request def setup() -> None: if request.path in public_paths: return token = request.cookies.get(token_cookie) user_info = oauth.authorize(token) if not user_info: return Response(status=403) remote_user = user_info['sub'] app.wg.set_user(remote_user) @app.after_request def reload(response: Response) -> Response: if app.wg.user_name: response.set_cookie(user_cookie, app.wg.user_name, secure=True, samesite='Strict') app.wg.update() return response @app.route(login_path) def login(): response = redirect(oauth.auth_url) response.set_cookie('return', request.args.get('return'), samesite='Lax') return response @app.route(callback_path) def authorize(): token = oauth.request_access_token(request.args.get('code')) return_path = request.cookies.get('return') if not return_path: return_path = '/' response = redirect(return_path) response.set_cookie(token_cookie, token, secure=True, httponly=True, samesite='Strict') return response @app.route('/configs/') def list_configs() -> list: return app.wg.list_configs() @app.route('/configs/') def get_config(config_id: str) -> dict: try: return app.wg.get_config(config_id) except FileNotFoundError: return {'result': 'failed', 'reason': 'Config id not found'} @app.route('/configs//create', methods=['POST']) def create_config(config_id: str) -> dict: data = request.get_json() name = data['name'] description = data['description'] creation_time = datetime.now() if not name: return {'result': 'failed', 'reason': 'Name is mandatory'} try: app.wg.generate_config_files(config_id, name, description, creation_time) except FileExistsError: return {'result': 'failed', 'reason': 'Id already in use'} return get_config(config_id) @app.route('/configs//update', methods=['POST']) def update_config(config_id: str) -> dict: data = request.get_json() name = data['name'] description = data['description'] try: app.wg.update_config(config_id, name, description) except FileNotFoundError: return {'result': 'failed', 'reason': 'Config id not found'} return {'result': 'success'} @app.route('/configs//delete', methods=['POST', 'DELETE']) def delete_config(config_id: str) -> dict: try: app.wg.delete_config(config_id) except FileNotFoundError: return {'result': 'failed', 'reason': 'Config id not found'} return {'result': 'success'}