from base64 import b64encode from configparser import ConfigParser from datetime import datetime from functools import wraps from pathlib import Path import json import re import sqlite3 from flask import jsonify, Flask, g, redirect, request, Response from .oauth import Oauth from .db import get_db, evolve login_path = '/login' callback_path = '/auth' public_paths = [login_path, callback_path] user_cookie = 'username' token_cookie = 'token' access_cookie = 'access' config = ConfigParser() config.read('./config.ini') app = Flask('clickmap') oauth = Oauth(config['oauth']) # Datatabase initialization, runs only once on application start evolve(get_db()) def prepare_frontend_settings(config): settings = {} for key, value in config['frontend'].items(): settings[key] = value json_string = json.dumps(settings) # Encode the json into base64, yields a byte array: base64_bytes = b64encode(bytes(json_string, 'utf-8')) # Turn that byte array into a string and return it: return base64_bytes.decode('utf-8') frontend_settings = prepare_frontend_settings(config) access_entitlement_list = [] if 'access_entitlement' in config['security']: access_conf = config.get('security', 'access_entitlement') access_entitlement_list = [e.strip() for e in access_conf.split(',')] export_entitlement_list = [] if 'export_entitlement' in config['security']: export_conf = config.get('security', 'export_entitlement') export_entitlement_list = [e.strip() for e in export_conf.split(',')] admin_entitlement_list = [] if 'admin_entitlement' in config['security']: admin_conf = config.get('security', 'admin_entitlement') admin_entitlement_list = [e.strip() for e in admin_conf.split(',')] def check_access(user_entitlements): if check_admin(user_entitlements): return True if check_export(user_entitlements): return True if not access_entitlement_list: return True for e in access_entitlement_list: if e in user_entitlements: return True return False def check_admin(user_entitlements): if not admin_entitlement_list: return True for e in admin_entitlement_list: if e in user_entitlements: return True return False def check_export(user_entitlements): if check_admin(user_entitlements): return True if not export_entitlement_list: return False for e in export_entitlement_list: if e in user_entitlements: return True return False def verify_admin(func): @wraps(func) def wrapper(*args, **kwargs): if 'admin' in g: return func(*args, **kwargs) return jsonify('DENIED') return wrapper def verify_export(func): @wraps(func) def wrapper(*args, **kwargs): if 'export' in g: return func(*args, **kwargs) return jsonify('DENIED') return wrapper @app.before_request def before() -> None: if request.path in public_paths: return token = request.cookies.get(token_cookie) user_info = oauth.authorize(token) if not user_info or not user_info['active']: return Response(status=403) if not check_access(user_info['entitlements']): response = Response(status=403) response.set_cookie(access_cookie, 'denied', secure=True, samesite='Strict') return response if check_admin(user_info['entitlements']): g.admin = True if check_export(user_info['entitlements']): g.export = True g.remote_user = user_info['sub'] if 'database' not in g: g.database = get_db() @app.after_request def after(response: Response) -> Response: response.set_cookie('server_settings', frontend_settings, secure=True, samesite='Strict') if 'remote_user' in g: response.set_cookie(user_cookie, g.remote_user, secure=True, samesite='Strict') if 'admin' in g: response.set_cookie('admin', 'true', secure=True, samesite='Strict') if 'export' in g: response.set_cookie('export', 'true', secure=True, samesite='Strict') return response @app.route(login_path) def login(): response = redirect(oauth.auth_url) response.set_cookie('return', request.args.get('return'), samesite='Lax') return response @app.route(callback_path) def authorize(): token = oauth.request_access_token(request.args.get('code')) return_path = request.cookies.get('return') if not return_path: return_path = '/' response = redirect(return_path) response.set_cookie(token_cookie, token, secure=True, httponly=True, samesite='Strict') return response @app.route('/') def base_url(): return jsonify('REST') @app.route('/points') def get_points(): data = {} for row in g.database.execute('SELECT * FROM `points`').fetchall(): point_data = {'placeName': row['placeName'], 'personName': row['personName'], 'personRole': row['personRole'], 'latitude': row['latitude'], 'longitude': row['longitude']} if 'export' in g: point_data['comment'] = row['comment'] data[row['id']] = point_data return data @app.route('/point', methods=['PUT']) @verify_admin def create_point(): data = request.get_json() insert = ''' INSERT OR REPLACE INTO `points` (`id`, `placeName`, `personName`, `latitude`, `longitude`, `personRole`, `comment`) VALUES (?, ?, ?, ?, ?, ?, ?) ''' g.database.execute(insert, (data['id'], data['placeName'], data['personName'], data['latitude'], data['longitude'], data['personRole'], data['comment'])) g.database.commit() return jsonify(data['id']) @app.route('/point', methods=['DELETE']) @verify_admin def delete_point(): data = request.get_json() g.database.execute('DELETE FROM `points` WHERE `id`=?', (data['id'],)) g.database.commit() return jsonify('OK') @app.route('/export') @verify_export def export_points(): csv_string = '"Plats"\t"Person"\t"Titel"\t"Kommentar"\n' for row in g.database.execute('SELECT * FROM `points`').fetchall(): placeName = row['placeName'] personName = row['personName'] personRole = row['personRole'] comment = row['comment'] csv_row = f'"{placeName}"\t"{personName}"\t' csv_row += f'"{personRole}"\t"{comment}"\n' csv_string += csv_row return jsonify({'csv_data': csv_string})