542 lines
20 KiB
Python
542 lines
20 KiB
Python
from __future__ import annotations
|
||
|
||
from datetime import date, datetime, time
|
||
from math import ceil
|
||
from typing import Any
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from sqlalchemy import Select, Text, and_, desc, func, or_, select
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeNewsLink
|
||
|
||
EMPLOYEE_SORTS = {
|
||
"full_name": Employee.full_name,
|
||
"status": Employee.status,
|
||
"first_seen_at": Employee.first_seen_at,
|
||
"last_seen_at": Employee.last_seen_at,
|
||
"dismissed_at": Employee.dismissed_at,
|
||
"hse_start_year": Employee.current_data["hse_start_year"].as_integer(),
|
||
}
|
||
|
||
|
||
def employee_display_payload(employee: Employee) -> dict[str, Any]:
|
||
data = _as_dict(employee.current_data)
|
||
contacts = _as_dict(data.get("contacts"))
|
||
sections = _as_list(data.get("sections"))
|
||
stored_news_links = _stored_news_links(employee)
|
||
positions = _clean_list(data.get("positions"))
|
||
emails = _clean_list(contacts.get("emails"))
|
||
phones = _clean_list(contacts.get("phones"))
|
||
return {
|
||
"id": employee.id,
|
||
"full_name": employee.full_name,
|
||
"status": employee.status,
|
||
"status_display": _employee_status_display(employee.status),
|
||
"canonical_url": employee.canonical_url,
|
||
"positions": positions,
|
||
"positions_text": "; ".join(positions),
|
||
"hse_start_year": data.get("hse_start_year"),
|
||
"emails": emails,
|
||
"email_text": ", ".join(emails),
|
||
"phones": phones,
|
||
"phone_text": ", ".join(phones),
|
||
"address": contacts.get("address"),
|
||
"publications_count": _count_section_items(sections, "publications"),
|
||
"courses_count": _count_section_items(sections, "courses_by_year"),
|
||
"news_count": len(stored_news_links) or _count_section_items(sections, "news"),
|
||
"first_seen_at": employee.first_seen_at.isoformat() if employee.first_seen_at else None,
|
||
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
|
||
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
|
||
"first_seen_display": format_admin_datetime(employee.first_seen_at),
|
||
"last_seen_display": format_admin_datetime(employee.last_seen_at),
|
||
"dismissed_display": format_admin_datetime(employee.dismissed_at),
|
||
}
|
||
|
||
|
||
def employee_detail_payload(employee: Employee) -> dict[str, Any]:
|
||
data = _as_dict(employee.current_data)
|
||
contacts = _as_dict(data.get("contacts"))
|
||
return {
|
||
**employee_display_payload(employee),
|
||
"profile_type": employee.profile_type or data.get("profile_type"),
|
||
"profile_id": employee.profile_id or data.get("profile_id"),
|
||
"parser_version": employee.parser_version or data.get("parser_version"),
|
||
"contacts": {
|
||
"emails": _clean_list(contacts.get("emails")),
|
||
"phones": _clean_list(contacts.get("phones")),
|
||
"address": contacts.get("address"),
|
||
"contact_items": _normalize_contact_items(contacts.get("items")),
|
||
},
|
||
"external_ids": _normalize_external_ids(data.get("external_ids")),
|
||
"news_links": _detail_news_links(employee, data),
|
||
"sections": [_normalize_section(section) for section in _as_list(data.get("sections"))],
|
||
}
|
||
|
||
|
||
def build_employee_query(
|
||
*,
|
||
status: str | None = None,
|
||
q: str | None = None,
|
||
started_from: date | None = None,
|
||
started_to: date | None = None,
|
||
has_email: bool | None = None,
|
||
) -> Select[tuple[Employee]]:
|
||
stmt = select(Employee)
|
||
filters = []
|
||
if status:
|
||
filters.append(Employee.status == status)
|
||
if q:
|
||
pattern = f"%{q}%"
|
||
filters.append(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
|
||
if started_from:
|
||
filters.append(Employee.first_seen_at >= datetime.combine(started_from, time.min))
|
||
if started_to:
|
||
filters.append(Employee.first_seen_at <= datetime.combine(started_to, time.max))
|
||
if has_email is True:
|
||
filters.append(Employee.current_data.cast(Text).ilike("%@%"))
|
||
elif has_email is False:
|
||
filters.append(or_(Employee.current_data.is_(None), ~Employee.current_data.cast(Text).ilike("%@%")))
|
||
if filters:
|
||
stmt = stmt.where(and_(*filters))
|
||
return stmt
|
||
|
||
|
||
def list_employees_page(
|
||
db: Session,
|
||
*,
|
||
status: str | None = None,
|
||
q: str | None = None,
|
||
started_from: date | None = None,
|
||
started_to: date | None = None,
|
||
has_email: bool | None = None,
|
||
sort: str = "full_name",
|
||
direction: str = "asc",
|
||
limit: int = 50,
|
||
offset: int = 0,
|
||
) -> dict[str, Any]:
|
||
limit = limit if limit in {25, 50, 100} else 50
|
||
offset = max(0, offset)
|
||
base_stmt = build_employee_query(
|
||
status=status,
|
||
q=q,
|
||
started_from=started_from,
|
||
started_to=started_to,
|
||
has_email=has_email,
|
||
)
|
||
total = db.scalar(select(func.count()).select_from(base_stmt.subquery())) or 0
|
||
sort_column = EMPLOYEE_SORTS.get(sort, Employee.full_name)
|
||
order = desc(sort_column) if direction == "desc" else sort_column
|
||
employees = db.scalars(base_stmt.order_by(order).limit(limit).offset(offset)).all()
|
||
return {
|
||
"employees": [employee_display_payload(employee) for employee in employees],
|
||
"total": total,
|
||
"limit": limit,
|
||
"offset": offset,
|
||
"pages": ceil(total / limit) if total else 0,
|
||
"page": (offset // limit) + 1,
|
||
}
|
||
|
||
|
||
def stats_payload(db: Session) -> dict[str, Any]:
|
||
latest_run = db.scalar(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(1))
|
||
running_run = db.scalar(select(CrawlRun).where(CrawlRun.status == "running").order_by(desc(CrawlRun.started_at)).limit(1))
|
||
latest_added = db.scalar(select(Employee).order_by(desc(Employee.first_seen_at)).limit(1))
|
||
return {
|
||
"total": db.scalar(select(func.count()).select_from(Employee)) or 0,
|
||
"active": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "active")) or 0,
|
||
"dismissed": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "dismissed")) or 0,
|
||
"new_in_last_run": latest_run.new_count if latest_run else 0,
|
||
"latest_added": employee_display_payload(latest_added) if latest_added else None,
|
||
"latest_run": run_payload(latest_run) if latest_run else None,
|
||
"current_running_run": run_payload(running_run) if running_run else None,
|
||
}
|
||
|
||
|
||
def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
|
||
if not run:
|
||
return None
|
||
processed = run.parsed_count + run.skipped_count + run.error_count
|
||
percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0
|
||
return {
|
||
"id": run.id,
|
||
"source_url": run.source_url,
|
||
"status": run.status,
|
||
"status_display": _run_status_display(run.status),
|
||
"started_at": run.started_at.isoformat() if run.started_at else None,
|
||
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
|
||
"started_display": format_admin_datetime(run.started_at),
|
||
"finished_display": format_admin_datetime(run.finished_at),
|
||
"found_count": run.found_count,
|
||
"parsed_count": run.parsed_count,
|
||
"skipped_count": run.skipped_count,
|
||
"new_count": run.new_count,
|
||
"error_count": run.error_count,
|
||
"dismissed_count": run.dismissed_count,
|
||
"processed_count": processed,
|
||
"progress_percent": percent,
|
||
"message": run.message,
|
||
}
|
||
|
||
|
||
def run_detail_payload(db: Session, run: CrawlRun | None) -> dict[str, Any] | None:
|
||
if not run:
|
||
return None
|
||
changes = db.scalars(
|
||
select(CrawlRunEmployeeChange)
|
||
.where(CrawlRunEmployeeChange.crawl_run_id == run.id)
|
||
.order_by(CrawlRunEmployeeChange.created_at, CrawlRunEmployeeChange.id)
|
||
).all()
|
||
errors = db.scalars(select(CrawlError).where(CrawlError.crawl_run_id == run.id).order_by(CrawlError.created_at)).all()
|
||
grouped_changes = {"new": [], "missing_from_source": [], "dismissed": []}
|
||
for change in changes:
|
||
grouped_changes.setdefault(change.change_type, []).append(_change_payload(change))
|
||
return {
|
||
**(run_payload(run) or {}),
|
||
"changes_detail_available": bool(changes),
|
||
"changes": grouped_changes,
|
||
"errors": [_crawl_error_payload(error) for error in errors],
|
||
}
|
||
|
||
|
||
def format_admin_datetime(value: Any) -> str:
|
||
if not value:
|
||
return "Не указано"
|
||
if isinstance(value, str):
|
||
try:
|
||
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
return value
|
||
if not isinstance(value, datetime):
|
||
return str(value)
|
||
if value.tzinfo:
|
||
value = value.astimezone(ZoneInfo("Europe/Moscow"))
|
||
return value.strftime("%d.%m.%Y %H:%M")
|
||
|
||
|
||
def _employee_status_display(status: str | None) -> str:
|
||
labels = {"active": "Работает", "dismissed": "Уволен"}
|
||
return labels.get(status or "", status or "Не указано")
|
||
|
||
|
||
def _run_status_display(status: str | None) -> str:
|
||
labels = {"running": "Выполняется", "completed": "Завершен", "failed": "Ошибка"}
|
||
return labels.get(status or "", status or "Не указано")
|
||
|
||
|
||
def _change_payload(change: CrawlRunEmployeeChange) -> dict[str, Any]:
|
||
return {
|
||
"id": change.id,
|
||
"employee_id": change.employee_id,
|
||
"profile_key": change.profile_key,
|
||
"profile_url": change.profile_url,
|
||
"full_name": change.full_name,
|
||
"change_type": change.change_type,
|
||
"change_type_display": _change_type_display(change.change_type),
|
||
"profile_available": change.profile_available,
|
||
"profile_available_display": _profile_available_display(change.profile_available),
|
||
"message": change.message,
|
||
"created_at": change.created_at.isoformat() if change.created_at else None,
|
||
"created_display": format_admin_datetime(change.created_at),
|
||
}
|
||
|
||
|
||
def _crawl_error_payload(error: CrawlError) -> dict[str, Any]:
|
||
return {
|
||
"id": error.id,
|
||
"crawl_run_id": error.crawl_run_id,
|
||
"profile_url": error.profile_url,
|
||
"error_type": error.error_type,
|
||
"message": error.message,
|
||
"created_at": error.created_at.isoformat() if error.created_at else None,
|
||
"created_display": format_admin_datetime(error.created_at),
|
||
}
|
||
|
||
|
||
def _change_type_display(change_type: str | None) -> str:
|
||
labels = {
|
||
"new": "Новый",
|
||
"missing_from_source": "Потеряшка",
|
||
"dismissed": "Уволен",
|
||
}
|
||
return labels.get(change_type or "", change_type or "Не указано")
|
||
|
||
|
||
def _profile_available_display(value: bool | None) -> str:
|
||
if value is True:
|
||
return "Профиль доступен"
|
||
if value is False:
|
||
return "Профиль недоступен"
|
||
return "Не проверялось"
|
||
|
||
|
||
def _count_section_items(sections: list[dict[str, Any]], section_type: str) -> int:
|
||
total = 0
|
||
for section in sections:
|
||
if section.get("type") != section_type:
|
||
continue
|
||
if section_type == "publications":
|
||
total += len(section.get("publications") or section.get("items") or [])
|
||
elif section_type == "courses_by_year":
|
||
total += len(section.get("courses") or [])
|
||
elif section_type == "news":
|
||
total += len(section.get("news_links") or section.get("items") or [])
|
||
return total
|
||
|
||
|
||
def _clean_list(values: Any) -> list[str]:
|
||
if values is None:
|
||
return []
|
||
if not isinstance(values, list):
|
||
values = [values]
|
||
return [str(value).strip() for value in values if str(value or "").strip()]
|
||
|
||
|
||
def _as_dict(value: Any) -> dict[str, Any]:
|
||
return value if isinstance(value, dict) else {}
|
||
|
||
|
||
def _as_list(value: Any) -> list[Any]:
|
||
if value is None:
|
||
return []
|
||
return value if isinstance(value, list) else [value]
|
||
|
||
|
||
def _normalize_contact_items(items: Any) -> list[str]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if isinstance(item, dict):
|
||
value = item.get("raw") or item.get("value") or item.get("text")
|
||
else:
|
||
value = item
|
||
value = str(value or "").strip()
|
||
if value:
|
||
normalized.append(value)
|
||
return normalized
|
||
|
||
|
||
def _normalize_external_ids(items: Any) -> list[dict[str, str | None]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
system = str(item.get("system") or "").strip()
|
||
value = str(item.get("value") or "").strip()
|
||
url = str(item.get("url") or "").strip()
|
||
if system or value or url:
|
||
normalized.append({"system": system or "ID", "value": value or url, "url": url or None})
|
||
return normalized
|
||
|
||
|
||
def _normalize_section(section: Any) -> dict[str, Any]:
|
||
if not isinstance(section, dict):
|
||
return {"title": "Раздел", "type": "generic", "paragraphs": [str(section)], "items": [], "links": []}
|
||
|
||
section_type = section.get("type") or "generic"
|
||
paragraphs = _clean_list(section.get("paragraphs"))
|
||
items = _clean_list(section.get("items"))
|
||
raw_text = str(section.get("raw_text") or "").strip()
|
||
if not paragraphs and not items and raw_text:
|
||
paragraphs = [raw_text]
|
||
|
||
return {
|
||
"title": section.get("title") or "Раздел",
|
||
"type": section_type,
|
||
"raw_text": raw_text,
|
||
"paragraphs": paragraphs,
|
||
"list_items": items,
|
||
"links": _normalize_links(section.get("links")),
|
||
"year_entries": _normalize_year_entries(section.get("year_entries")),
|
||
"publications": _normalize_publications(section.get("publications")),
|
||
"publications_count": section.get("publications_count"),
|
||
"news_links": _normalize_news_links(section.get("news_links")),
|
||
"news_count": section.get("news_count"),
|
||
"theses": _normalize_theses(section.get("theses")),
|
||
"theses_count": section.get("theses_count"),
|
||
"academic_year": section.get("academic_year"),
|
||
"courses": _normalize_courses(section.get("courses")),
|
||
"table": _normalize_table(section.get("table")),
|
||
}
|
||
|
||
|
||
def _normalize_links(items: Any) -> list[dict[str, str | None]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
text = str(item.get("text") or item.get("url") or "").strip()
|
||
url = str(item.get("url") or "").strip()
|
||
if text and url:
|
||
normalized.append({"text": text, "url": url})
|
||
return normalized
|
||
|
||
|
||
def _stored_news_links(employee: Employee) -> list[dict[str, Any]]:
|
||
return [_stored_news_link_payload(item) for item in sorted(employee.news_links, key=_news_link_sort_key)]
|
||
|
||
|
||
def _news_link_sort_key(item: EmployeeNewsLink) -> tuple:
|
||
timestamp = item.published_at.timestamp() if item.published_at else 0
|
||
return (-timestamp, item.title or "", item.id)
|
||
|
||
|
||
def _stored_news_link_payload(item: EmployeeNewsLink) -> dict[str, Any]:
|
||
return {
|
||
"title": item.title,
|
||
"url": item.url,
|
||
"summary": item.summary,
|
||
"published_at": item.published_at.isoformat() if item.published_at else None,
|
||
"published_year": item.published_year,
|
||
"published_display": format_admin_date(item.published_at) if item.published_at else str(item.published_year or ""),
|
||
}
|
||
|
||
|
||
def _detail_news_links(employee: Employee, data: dict[str, Any]) -> list[dict[str, Any]]:
|
||
stored = _stored_news_links(employee)
|
||
if stored:
|
||
return stored
|
||
for section in _as_list(data.get("sections")):
|
||
if isinstance(section, dict) and section.get("type") == "news":
|
||
return _normalize_news_links(section.get("news_links"))
|
||
return []
|
||
|
||
|
||
def format_admin_date(value: Any) -> str:
|
||
if not value:
|
||
return ""
|
||
if isinstance(value, str):
|
||
try:
|
||
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
return value
|
||
if not isinstance(value, datetime):
|
||
return str(value)
|
||
if value.tzinfo:
|
||
value = value.astimezone(ZoneInfo("Europe/Moscow"))
|
||
return value.strftime("%d.%m.%Y")
|
||
|
||
|
||
def _normalize_news_links(items: Any) -> list[dict[str, Any]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
title = str(item.get("title") or item.get("url") or "").strip()
|
||
url = str(item.get("url") or "").strip()
|
||
summary = str(item.get("summary") or "").strip()
|
||
published_at = str(item.get("published_at") or "").strip()
|
||
published_year = item.get("published_year")
|
||
if title or url:
|
||
normalized.append(
|
||
{
|
||
"title": title or url,
|
||
"url": url or None,
|
||
"summary": summary or None,
|
||
"published_at": published_at or None,
|
||
"published_year": published_year,
|
||
"published_display": format_admin_date(published_at) if published_at else str(published_year or ""),
|
||
}
|
||
)
|
||
return normalized
|
||
|
||
|
||
def _normalize_year_entries(items: Any) -> list[dict[str, Any]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
text = str(item.get("text") or "").strip()
|
||
if text:
|
||
normalized.append({"year": item.get("year"), "text": text, "links": _normalize_links(item.get("links"))})
|
||
return normalized
|
||
|
||
|
||
def _normalize_publications(items: Any) -> list[dict[str, str | None]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
text = str(item or "").strip()
|
||
if text:
|
||
normalized.append({"title": text, "text": text, "url": None})
|
||
continue
|
||
title = str(item.get("title") or "").strip()
|
||
text = str(item.get("text") or title).strip()
|
||
url = str(item.get("url") or "").strip()
|
||
if title or text:
|
||
normalized.append({"title": title or text, "text": text or title, "url": url or None})
|
||
return normalized
|
||
|
||
|
||
def _normalize_courses(items: Any) -> list[dict[str, str | None]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
title = str(item or "").strip()
|
||
if title:
|
||
normalized.append({"title": title, "url": None})
|
||
continue
|
||
title = str(item.get("title") or "").strip()
|
||
url = str(item.get("url") or "").strip()
|
||
if title or url:
|
||
normalized.append({"title": title or url, "url": url or None})
|
||
return normalized
|
||
|
||
|
||
def _normalize_theses(items: Any) -> list[dict[str, Any]]:
|
||
normalized = []
|
||
if not isinstance(items, list):
|
||
return normalized
|
||
for item in items:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
title = str(item.get("title") or "").strip()
|
||
student = str(item.get("student") or "").strip()
|
||
if not title and not student:
|
||
continue
|
||
normalized.append(
|
||
{
|
||
"id": item.get("id"),
|
||
"student": student,
|
||
"title": title,
|
||
"defense_year": item.get("defense_year") or item.get("year"),
|
||
"level": str(item.get("level") or "").strip(),
|
||
"rating": item.get("rating"),
|
||
"project_url": str(item.get("project_url") or "").strip() or None,
|
||
"program": str(item.get("program") or "").strip(),
|
||
"program_url": str(item.get("program_url") or "").strip() or None,
|
||
"org_unit": str(item.get("org_unit") or "").strip(),
|
||
"org_unit_url": str(item.get("org_unit_url") or "").strip() or None,
|
||
}
|
||
)
|
||
return normalized
|
||
|
||
|
||
def _normalize_table(table: Any) -> dict[str, Any] | None:
|
||
if not isinstance(table, dict):
|
||
return None
|
||
headers = _clean_list(table.get("headers"))
|
||
rows = []
|
||
for row in table.get("rows") or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
cells = _clean_list(row.get("cells"))
|
||
if cells:
|
||
rows.append({"cells": cells, "link_url": row.get("link_url")})
|
||
if not headers and not rows:
|
||
return None
|
||
return {"headers": headers, "rows": rows}
|