126 lines
3.4 KiB
Python
126 lines
3.4 KiB
Python
import sqlite3
|
|
from datetime import datetime
|
|
|
|
DB_PATH = "config.db"
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def get_access_windows(key):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT start_datetime, end_datetime FROM access_windows
|
|
WHERE key = ?
|
|
""", (key,))
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
def is_within_allowed_window(windows):
|
|
now = datetime.now()
|
|
for window in windows:
|
|
try:
|
|
start = datetime.fromisoformat(window["start_datetime"])
|
|
end = datetime.fromisoformat(window["end_datetime"])
|
|
if start <= now <= end:
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def get_all_keys_with_windows():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT * FROM allowed_keys ORDER BY key")
|
|
keys = cur.fetchall()
|
|
|
|
keys_data = []
|
|
for key_row in keys:
|
|
key_dict = dict(key_row)
|
|
cur.execute("""
|
|
SELECT id, start_datetime, end_datetime
|
|
FROM access_windows
|
|
WHERE key = ?
|
|
ORDER BY start_datetime
|
|
""", (key_dict["key"],))
|
|
windows = [dict(w) for w in cur.fetchall()]
|
|
key_dict["windows"] = windows
|
|
keys_data.append(key_dict)
|
|
|
|
conn.close()
|
|
return keys_data
|
|
|
|
def add_access_window(key, start_datetime, end_datetime):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO access_windows (key, start_datetime, end_datetime)
|
|
VALUES (?, ?, ?)
|
|
""", (key, start_datetime, end_datetime))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def delete_access_window(window_id):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM access_windows WHERE id = ?", (window_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def add_or_update_key_metadata(key, label):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO allowed_keys (key, label)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET
|
|
label = excluded.label
|
|
""", (key, label))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def delete_key(key):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM allowed_keys WHERE key = ?", (key,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def log_token_usage(key, prompt_tokens, completion_tokens, total_tokens):
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO token_usage (key, timestamp, prompt_tokens, completion_tokens, total_tokens)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (key, datetime.now().isoformat(), prompt_tokens, completion_tokens, total_tokens))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_token_usage_per_key():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT
|
|
key,
|
|
SUM(prompt_tokens) AS input_tokens,
|
|
SUM(completion_tokens) AS output_tokens,
|
|
SUM(total_tokens) AS total_tokens
|
|
FROM token_usage
|
|
GROUP BY key
|
|
""")
|
|
result = cur.fetchall()
|
|
conn.close()
|
|
return {
|
|
row["key"]: {
|
|
"input": row["input_tokens"],
|
|
"output": row["output_tokens"],
|
|
"total": row["total_tokens"]
|
|
} for row in result
|
|
}
|