feature: improve admin directory and crawl progress

This commit is contained in:
Anton
2026-04-28 17:24:10 +03:00
parent 51d83d7062
commit 4bd5f26469
19 changed files with 1082 additions and 58 deletions

159
app/services/admin_data.py Normal file
View File

@@ -0,0 +1,159 @@
from __future__ import annotations
from datetime import date, datetime, time
from math import ceil
from typing import Any
from sqlalchemy import Select, Text, and_, desc, func, or_, select
from sqlalchemy.orm import Session
from app.models import CrawlRun, Employee
EMPLOYEE_SORTS = {
"full_name": Employee.full_name,
"status": Employee.status,
"first_seen_at": Employee.first_seen_at,
"last_seen_at": Employee.last_seen_at,
"dismissed_at": Employee.dismissed_at,
"hse_start_year": Employee.current_data["hse_start_year"].as_integer(),
}
def employee_display_payload(employee: Employee) -> dict[str, Any]:
data = employee.current_data or {}
contacts = data.get("contacts") or {}
sections = data.get("sections") or []
emails = contacts.get("emails") or []
phones = contacts.get("phones") or []
return {
"id": employee.id,
"full_name": employee.full_name,
"status": employee.status,
"canonical_url": employee.canonical_url,
"positions": data.get("positions") or [],
"positions_text": "; ".join(data.get("positions") or []),
"hse_start_year": data.get("hse_start_year"),
"emails": emails,
"email_text": ", ".join(emails),
"phones": phones,
"phone_text": ", ".join(phones),
"address": contacts.get("address"),
"publications_count": _count_section_items(sections, "publications"),
"courses_count": _count_section_items(sections, "courses_by_year"),
"first_seen_at": employee.first_seen_at.isoformat() if employee.first_seen_at else None,
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
}
def build_employee_query(
*,
status: str | None = None,
q: str | None = None,
started_from: date | None = None,
started_to: date | None = None,
has_email: bool | None = None,
) -> Select[tuple[Employee]]:
stmt = select(Employee)
filters = []
if status:
filters.append(Employee.status == status)
if q:
pattern = f"%{q}%"
filters.append(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
if started_from:
filters.append(Employee.first_seen_at >= datetime.combine(started_from, time.min))
if started_to:
filters.append(Employee.first_seen_at <= datetime.combine(started_to, time.max))
if has_email is True:
filters.append(Employee.current_data.cast(Text).ilike("%@%"))
elif has_email is False:
filters.append(or_(Employee.current_data.is_(None), ~Employee.current_data.cast(Text).ilike("%@%")))
if filters:
stmt = stmt.where(and_(*filters))
return stmt
def list_employees_page(
db: Session,
*,
status: str | None = None,
q: str | None = None,
started_from: date | None = None,
started_to: date | None = None,
has_email: bool | None = None,
sort: str = "full_name",
direction: str = "asc",
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
limit = max(1, min(limit, 200))
offset = max(0, offset)
base_stmt = build_employee_query(
status=status,
q=q,
started_from=started_from,
started_to=started_to,
has_email=has_email,
)
total = db.scalar(select(func.count()).select_from(base_stmt.subquery())) or 0
sort_column = EMPLOYEE_SORTS.get(sort, Employee.full_name)
order = desc(sort_column) if direction == "desc" else sort_column
employees = db.scalars(base_stmt.order_by(order).limit(limit).offset(offset)).all()
return {
"items": [employee_display_payload(employee) for employee in employees],
"total": total,
"limit": limit,
"offset": offset,
"pages": ceil(total / limit) if total else 0,
"page": (offset // limit) + 1,
}
def stats_payload(db: Session) -> dict[str, Any]:
latest_run = db.scalar(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(1))
running_run = db.scalar(select(CrawlRun).where(CrawlRun.status == "running").order_by(desc(CrawlRun.started_at)).limit(1))
latest_added = db.scalar(select(Employee).order_by(desc(Employee.first_seen_at)).limit(1))
return {
"total": db.scalar(select(func.count()).select_from(Employee)) or 0,
"active": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "active")) or 0,
"dismissed": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "dismissed")) or 0,
"new_in_last_run": latest_run.new_count if latest_run else 0,
"latest_added": employee_display_payload(latest_added) if latest_added else None,
"latest_run": run_payload(latest_run) if latest_run else None,
"current_running_run": run_payload(running_run) if running_run else None,
}
def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
if not run:
return None
processed = run.parsed_count + run.error_count
percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0
return {
"id": run.id,
"source_url": run.source_url,
"status": run.status,
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"new_count": run.new_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,
"processed_count": processed,
"progress_percent": percent,
"message": run.message,
}
def _count_section_items(sections: list[dict[str, Any]], section_type: str) -> int:
total = 0
for section in sections:
if section.get("type") != section_type:
continue
if section_type == "publications":
total += len(section.get("publications") or section.get("items") or [])
elif section_type == "courses_by_year":
total += len(section.get("courses") or [])
return total