124 lines
3.5 KiB
Python
124 lines
3.5 KiB
Python
from configparser import ConfigParser
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import json
|
|
|
|
from flask import Flask, redirect, request, Response
|
|
|
|
from .wireguard import WireGuard
|
|
from .oauth import Oauth
|
|
|
|
|
|
config = ConfigParser()
|
|
config.read('./config.ini')
|
|
configs_base = Path(config['wireguard']['configs_base'])
|
|
wireguard_id = config['wireguard']['tunnel_id']
|
|
|
|
|
|
app = Flask(__name__)
|
|
oauth = Oauth(config['oauth'])
|
|
app.wg = WireGuard(config['wireguard'])
|
|
login_path = '/login'
|
|
callback_path = '/auth'
|
|
user_cookie = 'username'
|
|
token_cookie = 'token'
|
|
public_paths = [login_path, callback_path]
|
|
|
|
|
|
@app.before_request
|
|
def setup() -> None:
|
|
if request.path in public_paths:
|
|
return
|
|
|
|
token = request.cookies.get(token_cookie)
|
|
user_info = oauth.authorize(token)
|
|
if not user_info:
|
|
return Response(status=403)
|
|
|
|
remote_user = user_info['sub']
|
|
app.wg.set_user(remote_user)
|
|
|
|
@app.after_request
|
|
def reload(response: Response) -> Response:
|
|
if app.wg.user_name:
|
|
response.set_cookie(user_cookie,
|
|
app.wg.user_name,
|
|
secure=True,
|
|
samesite='Strict')
|
|
app.wg.update()
|
|
return response
|
|
|
|
|
|
@app.route(login_path)
|
|
def login():
|
|
response = redirect(oauth.auth_url)
|
|
response.set_cookie('return', request.args.get('return'), samesite='Lax')
|
|
return response
|
|
|
|
@app.route(callback_path)
|
|
def authorize():
|
|
token = oauth.request_access_token(request.args.get('code'))
|
|
return_path = request.cookies.get('return')
|
|
if not return_path:
|
|
return_path = '/'
|
|
response = redirect(return_path)
|
|
response.set_cookie(token_cookie,
|
|
token,
|
|
secure=True,
|
|
httponly=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
|
|
@app.route('/configs/')
|
|
def list_configs() -> list:
|
|
return app.wg.list_configs()
|
|
|
|
@app.route('/configs/<config_id>')
|
|
def get_config(config_id: str) -> dict:
|
|
try:
|
|
return app.wg.get_config(config_id)
|
|
except FileNotFoundError:
|
|
return {'result': 'failed',
|
|
'reason': 'Config id not found'}
|
|
|
|
@app.route('/configs/<config_id>/create', methods=['POST'])
|
|
def create_config(config_id: str) -> dict:
|
|
data = request.get_json()
|
|
name = data['name']
|
|
description = data['description']
|
|
creation_time = datetime.now()
|
|
if not name:
|
|
return {'result': 'failed',
|
|
'reason': 'Name is mandatory'}
|
|
try:
|
|
app.wg.generate_config_files(config_id,
|
|
name,
|
|
description,
|
|
creation_time)
|
|
except FileExistsError:
|
|
return {'result': 'failed',
|
|
'reason': 'Id already in use'}
|
|
return get_config(config_id)
|
|
|
|
@app.route('/configs/<config_id>/update', methods=['POST'])
|
|
def update_config(config_id: str) -> dict:
|
|
data = request.get_json()
|
|
name = data['name']
|
|
description = data['description']
|
|
try:
|
|
app.wg.update_config(config_id, name, description)
|
|
except FileNotFoundError:
|
|
return {'result': 'failed',
|
|
'reason': 'Config id not found'}
|
|
return {'result': 'success'}
|
|
|
|
@app.route('/configs/<config_id>/delete', methods=['POST', 'DELETE'])
|
|
def delete_config(config_id: str) -> dict:
|
|
try:
|
|
app.wg.delete_config(config_id)
|
|
except FileNotFoundError:
|
|
return {'result': 'failed',
|
|
'reason': 'Config id not found'}
|
|
return {'result': 'success'}
|