wg-selfserve/api/__init__.py
2025-02-19 15:19:36 +01:00

124 lines
3.5 KiB
Python

from configparser import ConfigParser
from datetime import datetime
from pathlib import Path
import json
from flask import Flask, redirect, request, Response
from .wireguard import WireGuard
from .oauth import Oauth
config = ConfigParser()
config.read('./config.ini')
configs_base = Path(config['wireguard']['configs_base'])
wireguard_id = config['wireguard']['tunnel_id']
app = Flask(__name__)
oauth = Oauth(config['oauth'])
app.wg = WireGuard(config['wireguard'])
login_path = '/login'
callback_path = '/auth'
user_cookie = 'username'
token_cookie = 'token'
public_paths = [login_path, callback_path]
@app.before_request
def setup() -> None:
if request.path in public_paths:
return
token = request.cookies.get(token_cookie)
user_info = oauth.authorize(token)
if not user_info:
return Response(status=403)
remote_user = user_info['sub']
app.wg.set_user(remote_user)
@app.after_request
def reload(response: Response) -> Response:
if app.wg.user_name:
response.set_cookie(user_cookie,
app.wg.user_name,
secure=True,
samesite='Strict')
app.wg.update()
return response
@app.route(login_path)
def login():
response = redirect(oauth.auth_url)
response.set_cookie('return', request.args.get('return'), samesite='Lax')
return response
@app.route(callback_path)
def authorize():
token = oauth.request_access_token(request.args.get('code'))
return_path = request.cookies.get('return')
if not return_path:
return_path = '/'
response = redirect(return_path)
response.set_cookie(token_cookie,
token,
secure=True,
httponly=True,
samesite='Strict')
return response
@app.route('/configs/')
def list_configs() -> list:
return app.wg.list_configs()
@app.route('/configs/<config_id>')
def get_config(config_id: str) -> dict:
try:
return app.wg.get_config(config_id)
except FileNotFoundError:
return {'result': 'failed',
'reason': 'Config id not found'}
@app.route('/configs/<config_id>/create', methods=['POST'])
def create_config(config_id: str) -> dict:
data = request.get_json()
name = data['name']
description = data['description']
creation_time = datetime.now()
if not name:
return {'result': 'failed',
'reason': 'Name is mandatory'}
try:
app.wg.generate_config_files(config_id,
name,
description,
creation_time)
except FileExistsError:
return {'result': 'failed',
'reason': 'Id already in use'}
return get_config(config_id)
@app.route('/configs/<config_id>/update', methods=['POST'])
def update_config(config_id: str) -> dict:
data = request.get_json()
name = data['name']
description = data['description']
try:
app.wg.update_config(config_id, name, description)
except FileNotFoundError:
return {'result': 'failed',
'reason': 'Config id not found'}
return {'result': 'success'}
@app.route('/configs/<config_id>/delete', methods=['POST', 'DELETE'])
def delete_config(config_id: str) -> dict:
try:
app.wg.delete_config(config_id)
except FileNotFoundError:
return {'result': 'failed',
'reason': 'Config id not found'}
return {'result': 'success'}