wg-selfserve/api/__init__.py
Erik Thuning ca3d536868 Added support for limited client validity
A new /cron endpoint has been introduced, which triggers a cleanup routine.
The cleanup routine loops over all existing clients and deletes all that are
older than the value configured in config.ini.

Periodically calling the cron endpoint is the responsibility of the server admin.
2025-03-26 13:30:53 +01:00

179 lines
5.2 KiB
Python

from base64 import b64encode
from configparser import ConfigParser
from datetime import datetime
from pathlib import Path
import json
import re
from flask import jsonify, Flask, redirect, request, Response
from .exceptions import ClientLimitError
from .oauth import Oauth
from .wireguard import WireGuard
login_path = '/login'
callback_path = '/auth'
cron_path = '/cron'
public_paths = [login_path, callback_path, cron_path]
user_cookie = 'username'
token_cookie = 'token'
access_cookie = 'access'
permitted_format = re.compile('^[A-Za-z0-9-]+$')
config = ConfigParser()
config.read('./config.ini')
app = Flask('wg-selfserve')
oauth = Oauth(config['oauth'])
app.wg = WireGuard(config['wireguard'])
def prepare_frontend_settings(config):
settings = {}
for key, value in config['frontend'].items():
settings[key] = value
settings['client_limit'] = config.getint('wireguard',
'user_client_limit',
fallback=0)
json_string = json.dumps(settings)
base64_bytes = b64encode(bytes(json_string, 'utf-8'))
return base64_bytes.decode('utf-8')
frontend_settings = prepare_frontend_settings(config)
required_entitlement_list = []
if 'required_entitlement' in config['security']:
entitlement_conf = config.get('security', 'required_entitlement')
required_entitlement_list = [e.strip() for e
in entitlement_conf.split(',')]
def check_access(user_entitlements):
if not required_entitlement_list:
return True
for e in required_entitlement_list:
if e in user_entitlements:
return True
return False
def fail(message: str) -> Response:
response = jsonify({'result': 'failed',
'reason': message})
response.status = 400
return response
@app.before_request
def setup() -> None:
if request.path in public_paths:
return
token = request.cookies.get(token_cookie)
user_info = oauth.authorize(token)
if not user_info or not user_info['active']:
return Response(status=403)
if not check_access(user_info['entitlements']):
response = Response(status=403)
response.set_cookie(access_cookie,
'denied',
secure=True,
samesite='Strict')
return response
remote_user = user_info['sub']
app.wg.set_user(remote_user)
@app.after_request
def reload(response: Response) -> Response:
response.set_cookie('server_settings',
frontend_settings,
secure=True,
samesite='Strict')
if app.wg.user_name:
response.set_cookie(user_cookie,
app.wg.user_name,
secure=True,
samesite='Strict')
app.wg.update()
return response
@app.route(login_path)
def login():
response = redirect(oauth.auth_url)
response.set_cookie('return', request.args.get('return'), samesite='Lax')
return response
@app.route(callback_path)
def authorize():
token = oauth.request_access_token(request.args.get('code'))
return_path = request.cookies.get('return')
if not return_path:
return_path = '/'
response = redirect(return_path)
response.set_cookie(token_cookie,
token,
secure=True,
httponly=True,
samesite='Strict')
return response
@app.route(cron_path)
def run_cron():
app.wg.run_cleanup()
return Response()
@app.route('/configs/')
def list_configs() -> list:
return app.wg.list_configs()
@app.route('/configs/<config_id>')
def get_config(config_id: str) -> dict:
try:
return app.wg.get_config(config_id)
except FileNotFoundError:
return fail('Config id not found')
@app.route('/configs/<config_id>/create', methods=['POST'])
def create_config(config_id: str) -> dict:
data = request.get_json()
name = data['name']
description = data['description']
creation_time = datetime.now()
if not permitted_format.match(config_id):
return fail('Invalid config id')
if not name:
return fail('Name is mandatory')
try:
app.wg.generate_config_files(config_id,
name,
description,
creation_time)
except FileExistsError:
return fail('Id already in use')
except ClientLimitError as e:
return fail(e.message)
return {'result': 'success'}
@app.route('/configs/<config_id>/update', methods=['POST'])
def update_config(config_id: str) -> dict:
data = request.get_json()
name = data['name']
description = data['description']
try:
app.wg.update_config(config_id, name, description)
except FileNotFoundError:
return fail('Config id not found')
return {'result': 'success'}
@app.route('/configs/<config_id>/delete', methods=['POST', 'DELETE'])
def delete_config(config_id: str) -> dict:
try:
app.wg.delete_config(config_id)
except FileNotFoundError:
return fail('Config id not found')
return {'result': 'success'}