maid/proxy/db.py
2025-08-08 08:34:04 +02:00

137 lines
3.9 KiB
Python

import psycopg2
import psycopg2.extras
from datetime import datetime
from config import get_database_url
def get_db_connection():
conn = psycopg2.connect(get_database_url())
return conn
def get_access_windows(key):
conn = get_db_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute("""
SELECT start_datetime, end_datetime FROM access_windows
WHERE key = %s
""", (key,))
rows = cur.fetchall()
conn.close()
return [dict(row) for row in rows]
def is_within_allowed_window(windows):
now = datetime.now()
for window in windows:
try:
start_dt = window["start_datetime"]
end_dt = window["end_datetime"]
# Handle both string and datetime objects
if isinstance(start_dt, str):
start = datetime.fromisoformat(start_dt)
else:
start = start_dt
if isinstance(end_dt, str):
end = datetime.fromisoformat(end_dt)
else:
end = end_dt
if start <= now <= end:
return True
except (ValueError, TypeError):
continue
return False
def get_all_keys_with_windows():
conn = get_db_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute("SELECT * FROM allowed_keys ORDER BY key")
keys = cur.fetchall()
keys_data = []
for key_row in keys:
key_dict = dict(key_row)
cur.execute("""
SELECT id, start_datetime, end_datetime
FROM access_windows
WHERE key = %s
ORDER BY start_datetime
""", (key_dict["key"],))
windows = [dict(w) for w in cur.fetchall()]
key_dict["windows"] = windows
keys_data.append(key_dict)
conn.close()
return keys_data
def add_access_window(key, start_datetime, end_datetime):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO access_windows (key, start_datetime, end_datetime)
VALUES (%s, %s, %s)
""", (key, start_datetime, end_datetime))
conn.commit()
conn.close()
def delete_access_window(window_id):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM access_windows WHERE id = %s", (window_id,))
conn.commit()
conn.close()
def add_or_update_key_metadata(key, label):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO allowed_keys (key, label)
VALUES (%s, %s)
ON CONFLICT(key) DO UPDATE SET
label = excluded.label
""", (key, label))
conn.commit()
conn.close()
def delete_key(key):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("DELETE FROM allowed_keys WHERE key = %s", (key,))
conn.commit()
conn.close()
def log_token_usage(key, prompt_tokens, completion_tokens, total_tokens):
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO token_usage (key, timestamp, prompt_tokens, completion_tokens, total_tokens)
VALUES (%s, %s, %s, %s, %s)
""", (key, datetime.now().isoformat(), prompt_tokens, completion_tokens, total_tokens))
conn.commit()
conn.close()
def get_token_usage_per_key():
conn = get_db_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute("""
SELECT
key,
SUM(prompt_tokens) AS input_tokens,
SUM(completion_tokens) AS output_tokens,
SUM(total_tokens) AS total_tokens
FROM token_usage
GROUP BY key
""")
result = cur.fetchall()
conn.close()
return {
row["key"]: {
"input": row["input_tokens"],
"output": row["output_tokens"],
"total": row["total_tokens"]
} for row in result
}