Files
miem_workers/app/admin.py

231 lines
7.4 KiB
Python

from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import desc, func, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import SessionLocal, get_db
from app.models import CrawlError, CrawlRun, Employee
from app.security import SESSION_COOKIE, require_admin, sign_session, verify_admin
from app.services.admin_data import (
employee_detail_payload,
format_admin_datetime,
list_employees_page,
run_detail_payload,
run_payload,
stats_payload,
)
from app.services.crawl_control import get_running_run, run_crawl_if_idle
from app.version import BACKEND_VERSION, FRONTEND_VERSION
router = APIRouter(prefix="/admin")
templates = Jinja2Templates(directory="app/templates")
@router.get("", response_class=HTMLResponse)
def dashboard(request: Request, db: Session = Depends(get_db), settings: Settings = Depends(get_settings)):
require_admin(request, settings)
counts = stats_payload(db)
counts["runs"] = db.scalar(select(func.count()).select_from(CrawlRun)) or 0
counts["errors"] = db.scalar(select(func.count()).select_from(CrawlError)) or 0
run_models = db.scalars(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(10)).all()
runs = [run_payload(run) for run in run_models]
return _render(request, "dashboard.html", {"counts": counts, "runs": runs, "latest_run": runs[0] if runs else None})
@router.get("/login", response_class=HTMLResponse)
def login_form(request: Request):
return _render(request, "login.html", {"error": None})
@router.post("/login")
def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
settings: Settings = Depends(get_settings),
):
if not verify_admin(username, password, settings):
return _render(request, "login.html", {"error": "Неверный логин или пароль"}, status_code=401)
redirect = RedirectResponse("/admin", status_code=303)
redirect.set_cookie(SESSION_COOKIE, sign_session(username, settings), httponly=True, samesite="lax")
return redirect
@router.post("/logout")
def logout():
redirect = RedirectResponse("/admin/login", status_code=303)
redirect.delete_cookie(SESSION_COOKIE)
return redirect
@router.get("/employees", response_class=HTMLResponse)
def employees(
request: Request,
status: str | None = None,
q: str | None = None,
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
return RedirectResponse("/admin/directory", status_code=303)
@router.get("/directory", response_class=HTMLResponse)
def directory(
request: Request,
status: str | None = None,
q: str | None = None,
started_from: str | None = None,
started_to: str | None = None,
has_email: str | None = None,
sort: str = "full_name",
direction: str = "asc",
limit: int = 50,
offset: int = 0,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
parsed_started_from = _parse_date(started_from)
parsed_started_to = _parse_date(started_to)
parsed_has_email = None if has_email in (None, "") else has_email == "true"
page = list_employees_page(
db,
status=status,
q=q,
started_from=parsed_started_from,
started_to=parsed_started_to,
has_email=parsed_has_email,
sort=sort,
direction=direction,
limit=limit,
offset=offset,
)
return _render(
request,
"directory.html",
{
"page": page,
"filters": {
"status": status or "",
"q": q or "",
"started_from": started_from or "",
"started_to": started_to or "",
"has_email": has_email or "",
"sort": sort,
"direction": direction,
"limit": page["limit"],
"offset": offset,
},
},
)
@router.get("/employees/{employee_id}", response_class=HTMLResponse)
def employee_detail(
employee_id: int,
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
employee = db.get(Employee, employee_id)
if not employee:
return RedirectResponse("/admin/employees", status_code=303)
snapshots = [
{
"captured_display": format_admin_datetime(snapshot.captured_at),
"checksum": snapshot.checksum,
"parser_version": snapshot.parser_version,
}
for snapshot in sorted(employee.snapshots, key=lambda item: item.captured_at, reverse=True)[:20]
]
return _render(
request,
"employee_detail.html",
{"employee": employee, "employee_view": employee_detail_payload(employee), "snapshots": snapshots},
)
@router.get("/runs", response_class=HTMLResponse)
def runs(request: Request, db: Session = Depends(get_db), settings: Settings = Depends(get_settings)):
require_admin(request, settings)
run_models = db.scalars(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(50)).all()
items = [run_payload(run) for run in run_models]
errors = db.scalars(select(CrawlError).order_by(desc(CrawlError.created_at)).limit(50)).all()
return _render(request, "runs.html", {"runs": items, "errors": errors})
@router.get("/runs/{run_id}", response_class=HTMLResponse)
def run_detail(
run_id: int,
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
run = db.get(CrawlRun, run_id)
if not run:
return RedirectResponse("/admin/runs", status_code=303)
return _render(request, "run_detail.html", {"run": run_detail_payload(db, run)})
@router.post("/runs")
def trigger_run(
request: Request,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
if get_running_run(db):
return RedirectResponse("/admin/runs", status_code=303)
def _crawl() -> None:
with SessionLocal() as db:
run_crawl_if_idle(db, settings)
background_tasks.add_task(_crawl)
return RedirectResponse("/admin/runs", status_code=303)
@router.post("/crawl-now")
def crawl_now(
request: Request,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
if get_running_run(db):
return RedirectResponse("/admin", status_code=303)
def _crawl() -> None:
with SessionLocal() as db:
run_crawl_if_idle(db, settings)
background_tasks.add_task(_crawl)
return RedirectResponse("/admin", status_code=303)
def _render(request: Request, template: str, context: dict, status_code: int = 200) -> HTMLResponse:
payload = {
"request": request,
"backend_version": BACKEND_VERSION,
"frontend_version": FRONTEND_VERSION,
**context,
}
return templates.TemplateResponse(request, template, payload, status_code=status_code)
def _parse_date(value: str | None):
if not value:
return None
try:
from datetime import date
return date.fromisoformat(value)
except ValueError:
return None