251 lines
7.0 KiB
Python
251 lines
7.0 KiB
Python
from base64 import b64encode
|
|
from configparser import ConfigParser
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
|
|
from flask import jsonify, Flask, g, redirect, request, Response
|
|
|
|
from .oauth import Oauth
|
|
from .db import get_db, evolve
|
|
|
|
login_path = '/login'
|
|
callback_path = '/auth'
|
|
public_paths = [login_path, callback_path]
|
|
user_cookie = 'username'
|
|
token_cookie = 'token'
|
|
access_cookie = 'access'
|
|
|
|
config = ConfigParser()
|
|
config.read('./config.ini')
|
|
|
|
app = Flask('clickmap')
|
|
oauth = Oauth(config['oauth'])
|
|
|
|
# Datatabase initialization, runs only once on application start
|
|
evolve(get_db())
|
|
|
|
def prepare_frontend_settings(config):
|
|
settings = {}
|
|
for key, value in config['frontend'].items():
|
|
settings[key] = value
|
|
json_string = json.dumps(settings)
|
|
# Encode the json into base64, yields a byte array:
|
|
base64_bytes = b64encode(bytes(json_string, 'utf-8'))
|
|
# Turn that byte array into a string and return it:
|
|
return base64_bytes.decode('utf-8')
|
|
|
|
|
|
frontend_settings = prepare_frontend_settings(config)
|
|
|
|
access_entitlement_list = []
|
|
if 'access_entitlement' in config['security']:
|
|
access_conf = config.get('security', 'access_entitlement')
|
|
access_entitlement_list = [e.strip() for
|
|
e in access_conf.split(',')]
|
|
|
|
export_entitlement_list = []
|
|
if 'export_entitlement' in config['security']:
|
|
export_conf = config.get('security', 'export_entitlement')
|
|
export_entitlement_list = [e.strip() for
|
|
e in export_conf.split(',')]
|
|
|
|
admin_entitlement_list = []
|
|
if 'admin_entitlement' in config['security']:
|
|
admin_conf = config.get('security', 'admin_entitlement')
|
|
admin_entitlement_list = [e.strip() for
|
|
e in admin_conf.split(',')]
|
|
|
|
|
|
|
|
def check_access(user_entitlements):
|
|
if check_admin(user_entitlements):
|
|
return True
|
|
if check_export(user_entitlements):
|
|
return True
|
|
if not access_entitlement_list:
|
|
return True
|
|
for e in access_entitlement_list:
|
|
if e in user_entitlements:
|
|
return True
|
|
return False
|
|
|
|
|
|
def check_admin(user_entitlements):
|
|
if not admin_entitlement_list:
|
|
return True
|
|
for e in admin_entitlement_list:
|
|
if e in user_entitlements:
|
|
return True
|
|
return False
|
|
|
|
|
|
def check_export(user_entitlements):
|
|
if check_admin(user_entitlements):
|
|
return True
|
|
if not export_entitlement_list:
|
|
return False
|
|
for e in export_entitlement_list:
|
|
if e in user_entitlements:
|
|
return True
|
|
return False
|
|
|
|
|
|
def verify_admin(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if 'admin' in g:
|
|
return func(*args, **kwargs)
|
|
return jsonify('DENIED')
|
|
return wrapper
|
|
|
|
|
|
def verify_export(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if 'export' in g:
|
|
return func(*args, **kwargs)
|
|
return jsonify('DENIED')
|
|
return wrapper
|
|
|
|
|
|
@app.before_request
|
|
def before() -> None:
|
|
if request.path in public_paths:
|
|
return
|
|
|
|
token = request.cookies.get(token_cookie)
|
|
user_info = oauth.authorize(token)
|
|
if not user_info or not user_info['active']:
|
|
return Response(status=403)
|
|
|
|
if not check_access(user_info['entitlements']):
|
|
response = Response(status=403)
|
|
response.set_cookie(access_cookie,
|
|
'denied',
|
|
secure=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
if check_admin(user_info['entitlements']):
|
|
g.admin = True
|
|
|
|
if check_export(user_info['entitlements']):
|
|
g.export = True
|
|
|
|
g.remote_user = user_info['sub']
|
|
if 'database' not in g:
|
|
g.database = get_db()
|
|
|
|
|
|
@app.after_request
|
|
def after(response: Response) -> Response:
|
|
response.set_cookie('server_settings',
|
|
frontend_settings,
|
|
secure=True,
|
|
samesite='Strict')
|
|
if 'remote_user' in g:
|
|
response.set_cookie(user_cookie,
|
|
g.remote_user,
|
|
secure=True,
|
|
samesite='Strict')
|
|
|
|
if 'admin' in g:
|
|
response.set_cookie('admin', 'true',
|
|
secure=True,
|
|
samesite='Strict')
|
|
|
|
if 'export' in g:
|
|
response.set_cookie('export', 'true',
|
|
secure=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
|
|
@app.route(login_path)
|
|
def login():
|
|
response = redirect(oauth.auth_url)
|
|
response.set_cookie('return', request.args.get('return'), samesite='Lax')
|
|
return response
|
|
|
|
|
|
@app.route(callback_path)
|
|
def authorize():
|
|
token = oauth.request_access_token(request.args.get('code'))
|
|
return_path = request.cookies.get('return')
|
|
if not return_path:
|
|
return_path = '/'
|
|
response = redirect(return_path)
|
|
response.set_cookie(token_cookie,
|
|
token,
|
|
secure=True,
|
|
httponly=True,
|
|
samesite='Strict')
|
|
return response
|
|
|
|
|
|
@app.route('/')
|
|
def base_url():
|
|
return jsonify('REST')
|
|
|
|
@app.route('/points')
|
|
def get_points():
|
|
data = {}
|
|
for row in g.database.execute('SELECT * FROM `points`').fetchall():
|
|
point_data = {'placeName': row['placeName'],
|
|
'personName': row['personName'],
|
|
'personRole': row['personRole'],
|
|
'latitude': row['latitude'],
|
|
'longitude': row['longitude']}
|
|
if 'export' in g:
|
|
point_data['comment'] = row['comment']
|
|
data[row['id']] = point_data
|
|
return data
|
|
|
|
@app.route('/point', methods=['PUT'])
|
|
@verify_admin
|
|
def create_point():
|
|
data = request.get_json()
|
|
insert = '''
|
|
INSERT OR REPLACE INTO `points`
|
|
(`id`, `placeName`, `personName`, `latitude`, `longitude`, `personRole`, `comment`)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
'''
|
|
g.database.execute(insert,
|
|
(data['id'],
|
|
data['placeName'],
|
|
data['personName'],
|
|
data['latitude'],
|
|
data['longitude'],
|
|
data['personRole'],
|
|
data['comment']))
|
|
g.database.commit()
|
|
return jsonify(data['id'])
|
|
|
|
@app.route('/point', methods=['DELETE'])
|
|
@verify_admin
|
|
def delete_point():
|
|
data = request.get_json()
|
|
g.database.execute('DELETE FROM `points` WHERE `id`=?', (data['id'],))
|
|
g.database.commit()
|
|
return jsonify('OK')
|
|
|
|
@app.route('/export')
|
|
@verify_export
|
|
def export_points():
|
|
csv_string = '"Plats"\t"Person"\t"Titel"\t"Kommentar"\n'
|
|
for row in g.database.execute('SELECT * FROM `points`').fetchall():
|
|
placeName = row['placeName']
|
|
personName = row['personName']
|
|
personRole = row['personRole']
|
|
comment = row['comment']
|
|
|
|
csv_row = f'"{placeName}"\t"{personName}"\t'
|
|
csv_row += f'"{personRole}"\t"{comment}"\n'
|
|
csv_string += csv_row
|
|
|
|
return jsonify({'csv_data': csv_string})
|