193 lines
6.9 KiB
Python
193 lines
6.9 KiB
Python
from flask import Flask, request, jsonify, render_template, redirect, url_for
|
||
from flask import request as flask_request
|
||
from flask_cors import CORS
|
||
import logging
|
||
import re
|
||
from auth import requires_auth
|
||
from config import init_env, init_logging
|
||
from openai_client import client, openai_model
|
||
from db import (
|
||
delete_key,
|
||
is_within_allowed_window,
|
||
get_access_windows,
|
||
add_or_update_key_metadata,
|
||
add_access_window,
|
||
delete_access_window,
|
||
get_all_keys_with_windows,
|
||
log_token_usage,
|
||
get_token_usage_per_key
|
||
)
|
||
|
||
app = Flask(__name__)
|
||
CORS(app)
|
||
|
||
init_logging()
|
||
init_env()
|
||
|
||
# --- Route: Proxy endpoint for OpenAI API calls ---
|
||
@app.route("/ask", methods=["POST"])
|
||
def ask_ai():
|
||
logging.info("Incoming request")
|
||
|
||
# Ensure incoming data is JSON
|
||
if not request.is_json:
|
||
return jsonify({"error": "Content-Type måste vara application/json"}), 400
|
||
|
||
# Parse JSON safely
|
||
try:
|
||
data = request.get_json(force=True)
|
||
except Exception as e:
|
||
logging.warning("Invalid JSON payload: %s", e)
|
||
return jsonify({"error": "Ogiltig JSON"}), 400
|
||
|
||
# Validate API proxy key
|
||
key = data.get("proxy_key")
|
||
if not isinstance(key, str) or not re.fullmatch(r"[a-zA-Z0-9\-]{4,64}", key):
|
||
return jsonify({"error": "Ogiltig eller saknad nyckel"}), 401
|
||
|
||
# Check if the key has any valid access windows
|
||
windows = get_access_windows(key)
|
||
if not windows:
|
||
return jsonify({"error": "Ogiltig eller saknad nyckel"}), 401
|
||
|
||
# Enforce time-based access control
|
||
if not is_within_allowed_window(windows):
|
||
formatted_windows = [f"{w['start_datetime']}–{w['end_datetime']}" for w in windows]
|
||
return jsonify({
|
||
"error": "Åtkomst tillåten under följande tider: " + ", ".join(formatted_windows)
|
||
}), 403
|
||
|
||
# Validate prompt and system role inputs
|
||
system_role = data.get("system_role")
|
||
prompt = data.get("prompt")
|
||
if not isinstance(system_role, str) or not isinstance(prompt, str):
|
||
return jsonify({"error": "Prompt och systemroll måste vara text"}), 400
|
||
|
||
if len(system_role) > 1000 or len(prompt) > 2000:
|
||
return jsonify({"error": "Inmatning för lång"}), 400
|
||
|
||
# Sanitize input to remove unwanted characters
|
||
def sanitize(text):
|
||
return re.sub(r"[<>]", "", text)
|
||
system_role = sanitize(system_role)
|
||
prompt = sanitize(prompt)
|
||
|
||
# Call OpenAI API and handle response
|
||
try:
|
||
response = client.chat.completions.create(
|
||
model=openai_model,
|
||
messages=[
|
||
{"role": "system", "content": system_role},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
timeout=10
|
||
)
|
||
result = response.choices[0].message.content
|
||
|
||
# Log token usage per key
|
||
usage = response.usage
|
||
log_token_usage(
|
||
key,
|
||
usage.prompt_tokens,
|
||
usage.completion_tokens,
|
||
usage.total_tokens
|
||
)
|
||
logging.info("OpenAI response returned (%d tokens)", usage.total_tokens)
|
||
|
||
return jsonify({"response": result})
|
||
|
||
except Exception as e:
|
||
logging.error("OpenAI API error: %s", e)
|
||
return jsonify({"error": "Ett fel inträffade vid kontakt med AI-tjänsten"}), 500
|
||
|
||
|
||
# --- Route: Admin panel for managing API access keys and windows ---
|
||
@app.route("/admin", methods=["GET", "POST"])
|
||
@requires_auth
|
||
def admin_panel():
|
||
error = None
|
||
form_data = {}
|
||
|
||
# Handle form submission (add/delete windows, keys)
|
||
if request.method == "POST":
|
||
action = request.form.get("action")
|
||
form_data = request.form.to_dict()
|
||
|
||
try:
|
||
if action == "add_window":
|
||
# Add a new access window for a key
|
||
key = request.form.get("key", "").strip()
|
||
start = request.form.get("start_datetime")
|
||
end = request.form.get("end_datetime")
|
||
if not key or not start or not end:
|
||
raise ValueError("Alla fält måste fyllas i.")
|
||
if end < start:
|
||
raise ValueError("Sluttid måste vara efter starttid.")
|
||
add_access_window(key, start, end)
|
||
|
||
elif action == "delete_key":
|
||
# Delete a key and all its associated data
|
||
delete_key(request.form.get("key"))
|
||
|
||
elif action == "delete_window":
|
||
# Delete a specific time window
|
||
delete_access_window(request.form.get("window_id"))
|
||
|
||
elif action == "bulk_add_full":
|
||
# Bulk add multiple keys and assign them all time windows
|
||
keys_raw = request.form.get("bulk_keys", "")
|
||
if not keys_raw.strip():
|
||
raise ValueError("Inga nycklar angivna.")
|
||
|
||
slots = request.form.to_dict(flat=False)
|
||
slot_keys = [k for k in slots if k.startswith("slots[")]
|
||
slot_indices = sorted(set(int(k.split("[")[1].split("]")[0]) for k in slot_keys if "][" in k))
|
||
|
||
# Parse all time slots
|
||
time_windows = []
|
||
for i in slot_indices:
|
||
start = request.form.get(f"slots[{i}][start]")
|
||
end = request.form.get(f"slots[{i}][end]")
|
||
if not start or not end:
|
||
raise ValueError("Tidsfönster saknar start eller slut.")
|
||
if end < start:
|
||
raise ValueError(f"Tidsfönster {i+1} har en sluttid före starttid.")
|
||
time_windows.append((start, end))
|
||
|
||
# Parse and validate key-label entries
|
||
keys_data = []
|
||
for line in keys_raw.strip().splitlines():
|
||
if ',' not in line:
|
||
raise ValueError(f"Ogiltig rad: '{line}'. Format ska vara nyckel,etikett.")
|
||
parts = line.split(",")
|
||
if len(parts) < 2:
|
||
raise ValueError(f"Ofullständig rad: '{line}'")
|
||
key = parts[0].strip()
|
||
label = parts[1].strip()
|
||
keys_data.append((key, label))
|
||
|
||
# Save keys and assign all time windows to each
|
||
for key, label in keys_data:
|
||
add_or_update_key_metadata(key, label)
|
||
for start, end in time_windows:
|
||
add_access_window(key, start, end)
|
||
|
||
return redirect(url_for("admin_panel"))
|
||
|
||
except Exception as e:
|
||
error = str(e)
|
||
|
||
# Prepare data to render admin dashboard
|
||
keys = get_all_keys_with_windows()
|
||
usage_by_key = get_token_usage_per_key()
|
||
for key in keys:
|
||
usage = usage_by_key.get(key["key"], {})
|
||
key["tokens_input"] = usage.get("input", 0)
|
||
key["tokens_output"] = usage.get("output", 0)
|
||
key["tokens_total"] = usage.get("total", 0)
|
||
|
||
return render_template("admin.html", keys=keys, error=error, form_data=form_data)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(debug=True) |