A new /cron endpoint has been introduced, which triggers a cleanup routine. The cleanup routine loops over all existing clients and deletes all that are older than the value configured in config.ini. Periodically calling the cron endpoint is the responsibility of the server admin.
179 lines
5.2 KiB
Python
179 lines
5.2 KiB
Python
from base64 import b64encode
|
|
from configparser import ConfigParser
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import json
|
|
import re
|
|
|
|
from flask import jsonify, Flask, redirect, request, Response
|
|
|
|
from .exceptions import ClientLimitError
|
|
from .oauth import Oauth
|
|
from .wireguard import WireGuard
|
|
|
|
|
|
login_path = '/login'
|
|
callback_path = '/auth'
|
|
cron_path = '/cron'
|
|
public_paths = [login_path, callback_path, cron_path]
|
|
user_cookie = 'username'
|
|
token_cookie = 'token'
|
|
access_cookie = 'access'
|
|
permitted_format = re.compile('^[A-Za-z0-9-]+$')
|
|
|
|
config = ConfigParser()
|
|
config.read('./config.ini')
|
|
|
|
app = Flask('wg-selfserve')
|
|
oauth = Oauth(config['oauth'])
|
|
app.wg = WireGuard(config['wireguard'])
|
|
|
|
def prepare_frontend_settings(config):
|
|
settings = {}
|
|
for key, value in config['frontend'].items():
|
|
settings[key] = value
|
|
settings['client_limit'] = config.getint('wireguard',
|
|
'user_client_limit',
|
|
fallback=0)
|
|
json_string = json.dumps(settings)
|
|
base64_bytes = b64encode(bytes(json_string, 'utf-8'))
|
|
return base64_bytes.decode('utf-8')
|
|
|
|
frontend_settings = prepare_frontend_settings(config)
|
|
|
|
required_entitlement_list = []
|
|
if 'required_entitlement' in config['security']:
|
|
entitlement_conf = config.get('security', 'required_entitlement')
|
|
required_entitlement_list = [e.strip() for e
|
|
in entitlement_conf.split(',')]
|
|
|
|
|
|
def check_access(user_entitlements):
|
|
if not required_entitlement_list:
|
|
return True
|
|
for e in required_entitlement_list:
|
|
if e in user_entitlements:
|
|
return True
|
|
return False
|
|
|
|
def fail(message: str) -> Response:
|
|
response = jsonify({'result': 'failed',
|
|
'reason': message})
|
|
response.status = 400
|
|
return response
|
|
|
|
|
|
@app.before_request
|
|
def setup() -> None:
|
|
if request.path in public_paths:
|
|
return
|
|
|
|
token = request.cookies.get(token_cookie)
|
|
user_info = oauth.authorize(token)
|
|
if not user_info or not user_info['active']:
|
|
return Response(status=403)
|
|
|
|
if not check_access(user_info['entitlements']):
|
|
response = Response(status=403)
|
|
response.set_cookie(access_cookie,
|
|
'denied',
|
|
secure=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
remote_user = user_info['sub']
|
|
app.wg.set_user(remote_user)
|
|
|
|
@app.after_request
|
|
def reload(response: Response) -> Response:
|
|
response.set_cookie('server_settings',
|
|
frontend_settings,
|
|
secure=True,
|
|
samesite='Strict')
|
|
if app.wg.user_name:
|
|
response.set_cookie(user_cookie,
|
|
app.wg.user_name,
|
|
secure=True,
|
|
samesite='Strict')
|
|
app.wg.update()
|
|
return response
|
|
|
|
|
|
@app.route(login_path)
|
|
def login():
|
|
response = redirect(oauth.auth_url)
|
|
response.set_cookie('return', request.args.get('return'), samesite='Lax')
|
|
return response
|
|
|
|
@app.route(callback_path)
|
|
def authorize():
|
|
token = oauth.request_access_token(request.args.get('code'))
|
|
return_path = request.cookies.get('return')
|
|
if not return_path:
|
|
return_path = '/'
|
|
response = redirect(return_path)
|
|
response.set_cookie(token_cookie,
|
|
token,
|
|
secure=True,
|
|
httponly=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
|
|
@app.route(cron_path)
|
|
def run_cron():
|
|
app.wg.run_cleanup()
|
|
return Response()
|
|
|
|
|
|
@app.route('/configs/')
|
|
def list_configs() -> list:
|
|
return app.wg.list_configs()
|
|
|
|
@app.route('/configs/<config_id>')
|
|
def get_config(config_id: str) -> dict:
|
|
try:
|
|
return app.wg.get_config(config_id)
|
|
except FileNotFoundError:
|
|
return fail('Config id not found')
|
|
|
|
@app.route('/configs/<config_id>/create', methods=['POST'])
|
|
def create_config(config_id: str) -> dict:
|
|
data = request.get_json()
|
|
name = data['name']
|
|
description = data['description']
|
|
creation_time = datetime.now()
|
|
if not permitted_format.match(config_id):
|
|
return fail('Invalid config id')
|
|
if not name:
|
|
return fail('Name is mandatory')
|
|
try:
|
|
app.wg.generate_config_files(config_id,
|
|
name,
|
|
description,
|
|
creation_time)
|
|
except FileExistsError:
|
|
return fail('Id already in use')
|
|
except ClientLimitError as e:
|
|
return fail(e.message)
|
|
return {'result': 'success'}
|
|
|
|
@app.route('/configs/<config_id>/update', methods=['POST'])
|
|
def update_config(config_id: str) -> dict:
|
|
data = request.get_json()
|
|
name = data['name']
|
|
description = data['description']
|
|
try:
|
|
app.wg.update_config(config_id, name, description)
|
|
except FileNotFoundError:
|
|
return fail('Config id not found')
|
|
return {'result': 'success'}
|
|
|
|
@app.route('/configs/<config_id>/delete', methods=['POST', 'DELETE'])
|
|
def delete_config(config_id: str) -> dict:
|
|
try:
|
|
app.wg.delete_config(config_id)
|
|
except FileNotFoundError:
|
|
return fail('Config id not found')
|
|
return {'result': 'success'}
|