82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
from flask import Blueprint, request, jsonify
|
|
import logging
|
|
import re
|
|
from providers import ProviderFactory
|
|
from db import (
|
|
is_within_allowed_window,
|
|
get_access_windows,
|
|
log_token_usage
|
|
)
|
|
|
|
api_bp = Blueprint('api', __name__)
|
|
|
|
@api_bp.route("/ask", methods=["POST"])
|
|
def ask_ai():
|
|
logging.info("Incoming request")
|
|
|
|
# Ensure incoming data is JSON
|
|
if not request.is_json:
|
|
return jsonify({"error": "Content-Type måste vara application/json"}), 400
|
|
|
|
# Parse JSON safely
|
|
try:
|
|
data = request.get_json(force=True)
|
|
except Exception as e:
|
|
logging.warning("Invalid JSON payload: %s", e)
|
|
return jsonify({"error": "Ogiltig JSON"}), 400
|
|
|
|
# Validate API proxy key
|
|
key = data.get("proxy_key")
|
|
if not isinstance(key, str) or not re.fullmatch(r"[a-zA-Z0-9\-]{4,64}", key):
|
|
return jsonify({"error": "Ogiltig eller saknad nyckel"}), 401
|
|
|
|
# Check if the key has any valid access windows
|
|
windows = get_access_windows(key)
|
|
if not windows:
|
|
return jsonify({"error": "Ogiltig eller saknad nyckel"}), 401
|
|
|
|
# Enforce time-based access control
|
|
if not is_within_allowed_window(windows):
|
|
formatted_windows = [f"{w['start_datetime']}-{w['end_datetime']}" for w in windows]
|
|
return jsonify({
|
|
"error": "Åtkomst tillåten under följande tider: " + ", ".join(formatted_windows)
|
|
}), 403
|
|
|
|
# Validate prompt and system role inputs
|
|
system_role = data.get("system_role")
|
|
prompt = data.get("prompt")
|
|
if not isinstance(system_role, str) or not isinstance(prompt, str):
|
|
return jsonify({"error": "Prompt och systemroll måste vara text"}), 400
|
|
|
|
if len(system_role) > 1000 or len(prompt) > 2000:
|
|
return jsonify({"error": "Inmatning för lång"}), 400
|
|
|
|
# Sanitize input to remove unwanted characters
|
|
def sanitize(text):
|
|
return re.sub(r"[<>]", "", text)
|
|
system_role = sanitize(system_role)
|
|
prompt = sanitize(prompt)
|
|
|
|
# Call AI provider and handle response
|
|
try:
|
|
provider = ProviderFactory.get_provider()
|
|
response_data = provider.generate_response(system_role, prompt)
|
|
|
|
result = response_data["response"]
|
|
usage = response_data["usage"]
|
|
|
|
# Log token usage per key
|
|
log_token_usage(
|
|
key,
|
|
usage["prompt_tokens"],
|
|
usage["completion_tokens"],
|
|
usage["total_tokens"]
|
|
)
|
|
logging.info("AI response returned (%d tokens) from provider: %s",
|
|
usage["total_tokens"], provider.__class__.__name__)
|
|
|
|
return jsonify({"response": result})
|
|
|
|
except Exception as e:
|
|
logging.error("AI provider error: %s", e)
|
|
return jsonify({"error": "Ett fel inträffade vid kontakt med AI-tjänsten"}), 500 |