Files
miem_workers/app/services/admin_data.py
2026-05-14 12:21:44 +03:00

464 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
from datetime import date, datetime, time
from math import ceil
from typing import Any
from zoneinfo import ZoneInfo
from sqlalchemy import Select, Text, and_, desc, func, or_, select
from sqlalchemy.orm import Session
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee
EMPLOYEE_SORTS = {
"full_name": Employee.full_name,
"status": Employee.status,
"first_seen_at": Employee.first_seen_at,
"last_seen_at": Employee.last_seen_at,
"dismissed_at": Employee.dismissed_at,
"hse_start_year": Employee.current_data["hse_start_year"].as_integer(),
}
def employee_display_payload(employee: Employee) -> dict[str, Any]:
data = _as_dict(employee.current_data)
contacts = _as_dict(data.get("contacts"))
sections = _as_list(data.get("sections"))
positions = _clean_list(data.get("positions"))
emails = _clean_list(contacts.get("emails"))
phones = _clean_list(contacts.get("phones"))
return {
"id": employee.id,
"full_name": employee.full_name,
"status": employee.status,
"status_display": _employee_status_display(employee.status),
"canonical_url": employee.canonical_url,
"positions": positions,
"positions_text": "; ".join(positions),
"hse_start_year": data.get("hse_start_year"),
"emails": emails,
"email_text": ", ".join(emails),
"phones": phones,
"phone_text": ", ".join(phones),
"address": contacts.get("address"),
"publications_count": _count_section_items(sections, "publications"),
"courses_count": _count_section_items(sections, "courses_by_year"),
"first_seen_at": employee.first_seen_at.isoformat() if employee.first_seen_at else None,
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
"first_seen_display": format_admin_datetime(employee.first_seen_at),
"last_seen_display": format_admin_datetime(employee.last_seen_at),
"dismissed_display": format_admin_datetime(employee.dismissed_at),
}
def employee_detail_payload(employee: Employee) -> dict[str, Any]:
data = _as_dict(employee.current_data)
contacts = _as_dict(data.get("contacts"))
return {
**employee_display_payload(employee),
"profile_type": employee.profile_type or data.get("profile_type"),
"profile_id": employee.profile_id or data.get("profile_id"),
"parser_version": employee.parser_version or data.get("parser_version"),
"contacts": {
"emails": _clean_list(contacts.get("emails")),
"phones": _clean_list(contacts.get("phones")),
"address": contacts.get("address"),
"contact_items": _normalize_contact_items(contacts.get("items")),
},
"external_ids": _normalize_external_ids(data.get("external_ids")),
"sections": [_normalize_section(section) for section in _as_list(data.get("sections"))],
}
def build_employee_query(
*,
status: str | None = None,
q: str | None = None,
started_from: date | None = None,
started_to: date | None = None,
has_email: bool | None = None,
) -> Select[tuple[Employee]]:
stmt = select(Employee)
filters = []
if status:
filters.append(Employee.status == status)
if q:
pattern = f"%{q}%"
filters.append(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
if started_from:
filters.append(Employee.first_seen_at >= datetime.combine(started_from, time.min))
if started_to:
filters.append(Employee.first_seen_at <= datetime.combine(started_to, time.max))
if has_email is True:
filters.append(Employee.current_data.cast(Text).ilike("%@%"))
elif has_email is False:
filters.append(or_(Employee.current_data.is_(None), ~Employee.current_data.cast(Text).ilike("%@%")))
if filters:
stmt = stmt.where(and_(*filters))
return stmt
def list_employees_page(
db: Session,
*,
status: str | None = None,
q: str | None = None,
started_from: date | None = None,
started_to: date | None = None,
has_email: bool | None = None,
sort: str = "full_name",
direction: str = "asc",
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
limit = limit if limit in {25, 50, 100} else 50
offset = max(0, offset)
base_stmt = build_employee_query(
status=status,
q=q,
started_from=started_from,
started_to=started_to,
has_email=has_email,
)
total = db.scalar(select(func.count()).select_from(base_stmt.subquery())) or 0
sort_column = EMPLOYEE_SORTS.get(sort, Employee.full_name)
order = desc(sort_column) if direction == "desc" else sort_column
employees = db.scalars(base_stmt.order_by(order).limit(limit).offset(offset)).all()
return {
"employees": [employee_display_payload(employee) for employee in employees],
"total": total,
"limit": limit,
"offset": offset,
"pages": ceil(total / limit) if total else 0,
"page": (offset // limit) + 1,
}
def stats_payload(db: Session) -> dict[str, Any]:
latest_run = db.scalar(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(1))
running_run = db.scalar(select(CrawlRun).where(CrawlRun.status == "running").order_by(desc(CrawlRun.started_at)).limit(1))
latest_added = db.scalar(select(Employee).order_by(desc(Employee.first_seen_at)).limit(1))
return {
"total": db.scalar(select(func.count()).select_from(Employee)) or 0,
"active": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "active")) or 0,
"dismissed": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "dismissed")) or 0,
"new_in_last_run": latest_run.new_count if latest_run else 0,
"latest_added": employee_display_payload(latest_added) if latest_added else None,
"latest_run": run_payload(latest_run) if latest_run else None,
"current_running_run": run_payload(running_run) if running_run else None,
}
def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
if not run:
return None
processed = run.parsed_count + run.skipped_count + run.error_count
percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0
return {
"id": run.id,
"source_url": run.source_url,
"status": run.status,
"status_display": _run_status_display(run.status),
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"started_display": format_admin_datetime(run.started_at),
"finished_display": format_admin_datetime(run.finished_at),
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"skipped_count": run.skipped_count,
"new_count": run.new_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,
"processed_count": processed,
"progress_percent": percent,
"message": run.message,
}
def run_detail_payload(db: Session, run: CrawlRun | None) -> dict[str, Any] | None:
if not run:
return None
changes = db.scalars(
select(CrawlRunEmployeeChange)
.where(CrawlRunEmployeeChange.crawl_run_id == run.id)
.order_by(CrawlRunEmployeeChange.created_at, CrawlRunEmployeeChange.id)
).all()
errors = db.scalars(select(CrawlError).where(CrawlError.crawl_run_id == run.id).order_by(CrawlError.created_at)).all()
grouped_changes = {"new": [], "missing_from_source": [], "dismissed": []}
for change in changes:
grouped_changes.setdefault(change.change_type, []).append(_change_payload(change))
return {
**(run_payload(run) or {}),
"changes_detail_available": bool(changes),
"changes": grouped_changes,
"errors": [_crawl_error_payload(error) for error in errors],
}
def format_admin_datetime(value: Any) -> str:
if not value:
return "Не указано"
if isinstance(value, str):
try:
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return value
if not isinstance(value, datetime):
return str(value)
if value.tzinfo:
value = value.astimezone(ZoneInfo("Europe/Moscow"))
return value.strftime("%d.%m.%Y %H:%M")
def _employee_status_display(status: str | None) -> str:
labels = {"active": "Работает", "dismissed": "Уволен"}
return labels.get(status or "", status or "Не указано")
def _run_status_display(status: str | None) -> str:
labels = {"running": "Выполняется", "completed": "Завершен", "failed": "Ошибка"}
return labels.get(status or "", status or "Не указано")
def _change_payload(change: CrawlRunEmployeeChange) -> dict[str, Any]:
return {
"id": change.id,
"employee_id": change.employee_id,
"profile_key": change.profile_key,
"profile_url": change.profile_url,
"full_name": change.full_name,
"change_type": change.change_type,
"change_type_display": _change_type_display(change.change_type),
"profile_available": change.profile_available,
"profile_available_display": _profile_available_display(change.profile_available),
"message": change.message,
"created_at": change.created_at.isoformat() if change.created_at else None,
"created_display": format_admin_datetime(change.created_at),
}
def _crawl_error_payload(error: CrawlError) -> dict[str, Any]:
return {
"id": error.id,
"crawl_run_id": error.crawl_run_id,
"profile_url": error.profile_url,
"error_type": error.error_type,
"message": error.message,
"created_at": error.created_at.isoformat() if error.created_at else None,
"created_display": format_admin_datetime(error.created_at),
}
def _change_type_display(change_type: str | None) -> str:
labels = {
"new": "Новый",
"missing_from_source": "Потеряшка",
"dismissed": "Уволен",
}
return labels.get(change_type or "", change_type or "Не указано")
def _profile_available_display(value: bool | None) -> str:
if value is True:
return "Профиль доступен"
if value is False:
return "Профиль недоступен"
return "Не проверялось"
def _count_section_items(sections: list[dict[str, Any]], section_type: str) -> int:
total = 0
for section in sections:
if section.get("type") != section_type:
continue
if section_type == "publications":
total += len(section.get("publications") or section.get("items") or [])
elif section_type == "courses_by_year":
total += len(section.get("courses") or [])
return total
def _clean_list(values: Any) -> list[str]:
if values is None:
return []
if not isinstance(values, list):
values = [values]
return [str(value).strip() for value in values if str(value or "").strip()]
def _as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _as_list(value: Any) -> list[Any]:
if value is None:
return []
return value if isinstance(value, list) else [value]
def _normalize_contact_items(items: Any) -> list[str]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if isinstance(item, dict):
value = item.get("raw") or item.get("value") or item.get("text")
else:
value = item
value = str(value or "").strip()
if value:
normalized.append(value)
return normalized
def _normalize_external_ids(items: Any) -> list[dict[str, str | None]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
continue
system = str(item.get("system") or "").strip()
value = str(item.get("value") or "").strip()
url = str(item.get("url") or "").strip()
if system or value or url:
normalized.append({"system": system or "ID", "value": value or url, "url": url or None})
return normalized
def _normalize_section(section: Any) -> dict[str, Any]:
if not isinstance(section, dict):
return {"title": "Раздел", "type": "generic", "paragraphs": [str(section)], "items": [], "links": []}
section_type = section.get("type") or "generic"
paragraphs = _clean_list(section.get("paragraphs"))
items = _clean_list(section.get("items"))
raw_text = str(section.get("raw_text") or "").strip()
if not paragraphs and not items and raw_text:
paragraphs = [raw_text]
return {
"title": section.get("title") or "Раздел",
"type": section_type,
"raw_text": raw_text,
"paragraphs": paragraphs,
"list_items": items,
"links": _normalize_links(section.get("links")),
"year_entries": _normalize_year_entries(section.get("year_entries")),
"publications": _normalize_publications(section.get("publications")),
"publications_count": section.get("publications_count"),
"theses": _normalize_theses(section.get("theses")),
"theses_count": section.get("theses_count"),
"academic_year": section.get("academic_year"),
"courses": _normalize_courses(section.get("courses")),
"table": _normalize_table(section.get("table")),
}
def _normalize_links(items: Any) -> list[dict[str, str | None]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
continue
text = str(item.get("text") or item.get("url") or "").strip()
url = str(item.get("url") or "").strip()
if text and url:
normalized.append({"text": text, "url": url})
return normalized
def _normalize_year_entries(items: Any) -> list[dict[str, Any]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
continue
text = str(item.get("text") or "").strip()
if text:
normalized.append({"year": item.get("year"), "text": text, "links": _normalize_links(item.get("links"))})
return normalized
def _normalize_publications(items: Any) -> list[dict[str, str | None]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
text = str(item or "").strip()
if text:
normalized.append({"title": text, "text": text, "url": None})
continue
title = str(item.get("title") or "").strip()
text = str(item.get("text") or title).strip()
url = str(item.get("url") or "").strip()
if title or text:
normalized.append({"title": title or text, "text": text or title, "url": url or None})
return normalized
def _normalize_courses(items: Any) -> list[dict[str, str | None]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
title = str(item or "").strip()
if title:
normalized.append({"title": title, "url": None})
continue
title = str(item.get("title") or "").strip()
url = str(item.get("url") or "").strip()
if title or url:
normalized.append({"title": title or url, "url": url or None})
return normalized
def _normalize_theses(items: Any) -> list[dict[str, Any]]:
normalized = []
if not isinstance(items, list):
return normalized
for item in items:
if not isinstance(item, dict):
continue
title = str(item.get("title") or "").strip()
student = str(item.get("student") or "").strip()
if not title and not student:
continue
normalized.append(
{
"id": item.get("id"),
"student": student,
"title": title,
"defense_year": item.get("defense_year") or item.get("year"),
"level": str(item.get("level") or "").strip(),
"rating": item.get("rating"),
"project_url": str(item.get("project_url") or "").strip() or None,
"program": str(item.get("program") or "").strip(),
"program_url": str(item.get("program_url") or "").strip() or None,
"org_unit": str(item.get("org_unit") or "").strip(),
"org_unit_url": str(item.get("org_unit_url") or "").strip() or None,
}
)
return normalized
def _normalize_table(table: Any) -> dict[str, Any] | None:
if not isinstance(table, dict):
return None
headers = _clean_list(table.get("headers"))
rows = []
for row in table.get("rows") or []:
if not isinstance(row, dict):
continue
cells = _clean_list(row.get("cells"))
if cells:
rows.append({"cells": cells, "link_url": row.get("link_url")})
if not headers and not rows:
return None
return {"headers": headers, "rows": rows}