clickmap/api/__init__.py

251 lines
7.0 KiB
Python

from base64 import b64encode
from configparser import ConfigParser
from datetime import datetime
from functools import wraps
from pathlib import Path
import json
import re
import sqlite3
from flask import jsonify, Flask, g, redirect, request, Response
from .oauth import Oauth
from .db import get_db, evolve
login_path = '/login'
callback_path = '/auth'
public_paths = [login_path, callback_path]
user_cookie = 'username'
token_cookie = 'token'
access_cookie = 'access'
config = ConfigParser()
config.read('./config.ini')
app = Flask('clickmap')
oauth = Oauth(config['oauth'])
# Datatabase initialization, runs only once on application start
evolve(get_db())
def prepare_frontend_settings(config):
settings = {}
for key, value in config['frontend'].items():
settings[key] = value
json_string = json.dumps(settings)
# Encode the json into base64, yields a byte array:
base64_bytes = b64encode(bytes(json_string, 'utf-8'))
# Turn that byte array into a string and return it:
return base64_bytes.decode('utf-8')
frontend_settings = prepare_frontend_settings(config)
access_entitlement_list = []
if 'access_entitlement' in config['security']:
access_conf = config.get('security', 'access_entitlement')
access_entitlement_list = [e.strip() for
e in access_conf.split(',')]
export_entitlement_list = []
if 'export_entitlement' in config['security']:
export_conf = config.get('security', 'export_entitlement')
export_entitlement_list = [e.strip() for
e in export_conf.split(',')]
admin_entitlement_list = []
if 'admin_entitlement' in config['security']:
admin_conf = config.get('security', 'admin_entitlement')
admin_entitlement_list = [e.strip() for
e in admin_conf.split(',')]
def check_access(user_entitlements):
if check_admin(user_entitlements):
return True
if check_export(user_entitlements):
return True
if not access_entitlement_list:
return True
for e in access_entitlement_list:
if e in user_entitlements:
return True
return False
def check_admin(user_entitlements):
if not admin_entitlement_list:
return True
for e in admin_entitlement_list:
if e in user_entitlements:
return True
return False
def check_export(user_entitlements):
if check_admin(user_entitlements):
return True
if not export_entitlement_list:
return False
for e in export_entitlement_list:
if e in user_entitlements:
return True
return False
def verify_admin(func):
@wraps(func)
def wrapper(*args, **kwargs):
if 'admin' in g:
return func(*args, **kwargs)
return jsonify('DENIED')
return wrapper
def verify_export(func):
@wraps(func)
def wrapper(*args, **kwargs):
if 'export' in g:
return func(*args, **kwargs)
return jsonify('DENIED')
return wrapper
@app.before_request
def before() -> None:
if request.path in public_paths:
return
token = request.cookies.get(token_cookie)
user_info = oauth.authorize(token)
if not user_info or not user_info['active']:
return Response(status=403)
if not check_access(user_info['entitlements']):
response = Response(status=403)
response.set_cookie(access_cookie,
'denied',
secure=True,
samesite='Strict')
return response
if check_admin(user_info['entitlements']):
g.admin = True
if check_export(user_info['entitlements']):
g.export = True
g.remote_user = user_info['sub']
if 'database' not in g:
g.database = get_db()
@app.after_request
def after(response: Response) -> Response:
response.set_cookie('server_settings',
frontend_settings,
secure=True,
samesite='Strict')
if 'remote_user' in g:
response.set_cookie(user_cookie,
g.remote_user,
secure=True,
samesite='Strict')
if 'admin' in g:
response.set_cookie('admin', 'true',
secure=True,
samesite='Strict')
if 'export' in g:
response.set_cookie('export', 'true',
secure=True,
samesite='Strict')
return response
@app.route(login_path)
def login():
response = redirect(oauth.auth_url)
response.set_cookie('return', request.args.get('return'), samesite='Lax')
return response
@app.route(callback_path)
def authorize():
token = oauth.request_access_token(request.args.get('code'))
return_path = request.cookies.get('return')
if not return_path:
return_path = '/'
response = redirect(return_path)
response.set_cookie(token_cookie,
token,
secure=True,
httponly=True,
samesite='Strict')
return response
@app.route('/')
def base_url():
return jsonify('REST')
@app.route('/points')
def get_points():
data = {}
for row in g.database.execute('SELECT * FROM `points`').fetchall():
point_data = {'placeName': row['placeName'],
'personName': row['personName'],
'personRole': row['personRole'],
'latitude': row['latitude'],
'longitude': row['longitude']}
if 'export' in g:
point_data['comment'] = row['comment']
data[row['id']] = point_data
return data
@app.route('/point', methods=['PUT'])
@verify_admin
def create_point():
data = request.get_json()
insert = '''
INSERT OR REPLACE INTO `points`
(`id`, `placeName`, `personName`, `latitude`, `longitude`, `personRole`, `comment`)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
g.database.execute(insert,
(data['id'],
data['placeName'],
data['personName'],
data['latitude'],
data['longitude'],
data['personRole'],
data['comment']))
g.database.commit()
return jsonify(data['id'])
@app.route('/point', methods=['DELETE'])
@verify_admin
def delete_point():
data = request.get_json()
g.database.execute('DELETE FROM `points` WHERE `id`=?', (data['id'],))
g.database.commit()
return jsonify('OK')
@app.route('/export')
@verify_export
def export_points():
csv_string = '"Plats"\t"Person"\t"Titel"\t"Kommentar"\n'
for row in g.database.execute('SELECT * FROM `points`').fetchall():
placeName = row['placeName']
personName = row['personName']
personRole = row['personRole']
comment = row['comment']
csv_row = f'"{placeName}"\t"{personName}"\t'
csv_row += f'"{personRole}"\t"{comment}"\n'
csv_string += csv_row
return jsonify({'csv_data': csv_string})