import base64 import hashlib import hmac import json import time from fastapi import HTTPException, Request, status from app.config import Settings SESSION_COOKIE = "miem_admin_session" def verify_admin(username: str, password: str, settings: Settings) -> bool: return hmac.compare_digest(username, settings.admin_username) and hmac.compare_digest( password, settings.admin_password ) def sign_session(username: str, settings: Settings) -> str: payload = base64.urlsafe_b64encode( json.dumps({"sub": username, "iat": int(time.time())}, separators=(",", ":")).encode("utf-8") ).decode("ascii") signature = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest() return f"{payload}.{signature}" def read_session(token: str | None, settings: Settings) -> str | None: if not token or "." not in token: return None payload, signature = token.rsplit(".", 1) expected = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest() if not hmac.compare_digest(signature, expected): return None try: data = json.loads(base64.urlsafe_b64decode(payload.encode("ascii"))) except Exception: return None return data.get("sub") def require_admin(request: Request, settings: Settings) -> str: username = read_session(request.cookies.get(SESSION_COOKIE), settings) if not username: raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/admin/login"}) return username def require_mcp_token(request: Request, settings: Settings) -> None: auth = request.headers.get("authorization", "") if not auth.startswith("Bearer ") or not hmac.compare_digest(auth.removeprefix("Bearer ").strip(), settings.mcp_token): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MCP token")