feature: add MIEM employees parser service with admin UI and MCP
This commit is contained in:
52
app/security.py
Normal file
52
app/security.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import time
|
||||
|
||||
from fastapi import HTTPException, Request, status
|
||||
|
||||
from app.config import Settings
|
||||
|
||||
SESSION_COOKIE = "miem_admin_session"
|
||||
|
||||
|
||||
def verify_admin(username: str, password: str, settings: Settings) -> bool:
|
||||
return hmac.compare_digest(username, settings.admin_username) and hmac.compare_digest(
|
||||
password, settings.admin_password
|
||||
)
|
||||
|
||||
|
||||
def sign_session(username: str, settings: Settings) -> str:
|
||||
payload = base64.urlsafe_b64encode(
|
||||
json.dumps({"sub": username, "iat": int(time.time())}, separators=(",", ":")).encode("utf-8")
|
||||
).decode("ascii")
|
||||
signature = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
|
||||
return f"{payload}.{signature}"
|
||||
|
||||
|
||||
def read_session(token: str | None, settings: Settings) -> str | None:
|
||||
if not token or "." not in token:
|
||||
return None
|
||||
payload, signature = token.rsplit(".", 1)
|
||||
expected = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(signature, expected):
|
||||
return None
|
||||
try:
|
||||
data = json.loads(base64.urlsafe_b64decode(payload.encode("ascii")))
|
||||
except Exception:
|
||||
return None
|
||||
return data.get("sub")
|
||||
|
||||
|
||||
def require_admin(request: Request, settings: Settings) -> str:
|
||||
username = read_session(request.cookies.get(SESSION_COOKIE), settings)
|
||||
if not username:
|
||||
raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/admin/login"})
|
||||
return username
|
||||
|
||||
|
||||
def require_mcp_token(request: Request, settings: Settings) -> None:
|
||||
auth = request.headers.get("authorization", "")
|
||||
if not auth.startswith("Bearer ") or not hmac.compare_digest(auth.removeprefix("Bearer ").strip(), settings.mcp_token):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MCP token")
|
||||
Reference in New Issue
Block a user