from __future__ import annotations from datetime import date, datetime, time from math import ceil from typing import Any from zoneinfo import ZoneInfo from sqlalchemy import Select, Text, and_, desc, func, or_, select from sqlalchemy.orm import Session from app.models import CrawlRun, Employee EMPLOYEE_SORTS = { "full_name": Employee.full_name, "status": Employee.status, "first_seen_at": Employee.first_seen_at, "last_seen_at": Employee.last_seen_at, "dismissed_at": Employee.dismissed_at, "hse_start_year": Employee.current_data["hse_start_year"].as_integer(), } def employee_display_payload(employee: Employee) -> dict[str, Any]: data = _as_dict(employee.current_data) contacts = _as_dict(data.get("contacts")) sections = _as_list(data.get("sections")) positions = _clean_list(data.get("positions")) emails = _clean_list(contacts.get("emails")) phones = _clean_list(contacts.get("phones")) return { "id": employee.id, "full_name": employee.full_name, "status": employee.status, "status_display": _employee_status_display(employee.status), "canonical_url": employee.canonical_url, "positions": positions, "positions_text": "; ".join(positions), "hse_start_year": data.get("hse_start_year"), "emails": emails, "email_text": ", ".join(emails), "phones": phones, "phone_text": ", ".join(phones), "address": contacts.get("address"), "publications_count": _count_section_items(sections, "publications"), "courses_count": _count_section_items(sections, "courses_by_year"), "first_seen_at": employee.first_seen_at.isoformat() if employee.first_seen_at else None, "last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None, "dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None, "first_seen_display": format_admin_datetime(employee.first_seen_at), "last_seen_display": format_admin_datetime(employee.last_seen_at), "dismissed_display": format_admin_datetime(employee.dismissed_at), } def employee_detail_payload(employee: Employee) -> dict[str, Any]: data = _as_dict(employee.current_data) contacts = _as_dict(data.get("contacts")) return { **employee_display_payload(employee), "profile_type": employee.profile_type or data.get("profile_type"), "profile_id": employee.profile_id or data.get("profile_id"), "parser_version": employee.parser_version or data.get("parser_version"), "contacts": { "emails": _clean_list(contacts.get("emails")), "phones": _clean_list(contacts.get("phones")), "address": contacts.get("address"), "contact_items": _normalize_contact_items(contacts.get("items")), }, "external_ids": _normalize_external_ids(data.get("external_ids")), "sections": [_normalize_section(section) for section in _as_list(data.get("sections"))], } def build_employee_query( *, status: str | None = None, q: str | None = None, started_from: date | None = None, started_to: date | None = None, has_email: bool | None = None, ) -> Select[tuple[Employee]]: stmt = select(Employee) filters = [] if status: filters.append(Employee.status == status) if q: pattern = f"%{q}%" filters.append(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern))) if started_from: filters.append(Employee.first_seen_at >= datetime.combine(started_from, time.min)) if started_to: filters.append(Employee.first_seen_at <= datetime.combine(started_to, time.max)) if has_email is True: filters.append(Employee.current_data.cast(Text).ilike("%@%")) elif has_email is False: filters.append(or_(Employee.current_data.is_(None), ~Employee.current_data.cast(Text).ilike("%@%"))) if filters: stmt = stmt.where(and_(*filters)) return stmt def list_employees_page( db: Session, *, status: str | None = None, q: str | None = None, started_from: date | None = None, started_to: date | None = None, has_email: bool | None = None, sort: str = "full_name", direction: str = "asc", limit: int = 50, offset: int = 0, ) -> dict[str, Any]: limit = limit if limit in {25, 50, 100} else 50 offset = max(0, offset) base_stmt = build_employee_query( status=status, q=q, started_from=started_from, started_to=started_to, has_email=has_email, ) total = db.scalar(select(func.count()).select_from(base_stmt.subquery())) or 0 sort_column = EMPLOYEE_SORTS.get(sort, Employee.full_name) order = desc(sort_column) if direction == "desc" else sort_column employees = db.scalars(base_stmt.order_by(order).limit(limit).offset(offset)).all() return { "employees": [employee_display_payload(employee) for employee in employees], "total": total, "limit": limit, "offset": offset, "pages": ceil(total / limit) if total else 0, "page": (offset // limit) + 1, } def stats_payload(db: Session) -> dict[str, Any]: latest_run = db.scalar(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(1)) running_run = db.scalar(select(CrawlRun).where(CrawlRun.status == "running").order_by(desc(CrawlRun.started_at)).limit(1)) latest_added = db.scalar(select(Employee).order_by(desc(Employee.first_seen_at)).limit(1)) return { "total": db.scalar(select(func.count()).select_from(Employee)) or 0, "active": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "active")) or 0, "dismissed": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "dismissed")) or 0, "new_in_last_run": latest_run.new_count if latest_run else 0, "latest_added": employee_display_payload(latest_added) if latest_added else None, "latest_run": run_payload(latest_run) if latest_run else None, "current_running_run": run_payload(running_run) if running_run else None, } def run_payload(run: CrawlRun | None) -> dict[str, Any] | None: if not run: return None processed = run.parsed_count + run.error_count percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0 return { "id": run.id, "source_url": run.source_url, "status": run.status, "status_display": _run_status_display(run.status), "started_at": run.started_at.isoformat() if run.started_at else None, "finished_at": run.finished_at.isoformat() if run.finished_at else None, "started_display": format_admin_datetime(run.started_at), "finished_display": format_admin_datetime(run.finished_at), "found_count": run.found_count, "parsed_count": run.parsed_count, "new_count": run.new_count, "error_count": run.error_count, "dismissed_count": run.dismissed_count, "processed_count": processed, "progress_percent": percent, "message": run.message, } def format_admin_datetime(value: Any) -> str: if not value: return "Не указано" if isinstance(value, str): try: value = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return value if not isinstance(value, datetime): return str(value) if value.tzinfo: value = value.astimezone(ZoneInfo("Europe/Moscow")) return value.strftime("%d.%m.%Y %H:%M") def _employee_status_display(status: str | None) -> str: labels = {"active": "Работает", "dismissed": "Уволен"} return labels.get(status or "", status or "Не указано") def _run_status_display(status: str | None) -> str: labels = {"running": "Выполняется", "completed": "Завершен", "failed": "Ошибка"} return labels.get(status or "", status or "Не указано") def _count_section_items(sections: list[dict[str, Any]], section_type: str) -> int: total = 0 for section in sections: if section.get("type") != section_type: continue if section_type == "publications": total += len(section.get("publications") or section.get("items") or []) elif section_type == "courses_by_year": total += len(section.get("courses") or []) return total def _clean_list(values: Any) -> list[str]: if values is None: return [] if not isinstance(values, list): values = [values] return [str(value).strip() for value in values if str(value or "").strip()] def _as_dict(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def _as_list(value: Any) -> list[Any]: if value is None: return [] return value if isinstance(value, list) else [value] def _normalize_contact_items(items: Any) -> list[str]: normalized = [] if not isinstance(items, list): return normalized for item in items: if isinstance(item, dict): value = item.get("raw") or item.get("value") or item.get("text") else: value = item value = str(value or "").strip() if value: normalized.append(value) return normalized def _normalize_external_ids(items: Any) -> list[dict[str, str | None]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): continue system = str(item.get("system") or "").strip() value = str(item.get("value") or "").strip() url = str(item.get("url") or "").strip() if system or value or url: normalized.append({"system": system or "ID", "value": value or url, "url": url or None}) return normalized def _normalize_section(section: Any) -> dict[str, Any]: if not isinstance(section, dict): return {"title": "Раздел", "type": "generic", "paragraphs": [str(section)], "items": [], "links": []} section_type = section.get("type") or "generic" paragraphs = _clean_list(section.get("paragraphs")) items = _clean_list(section.get("items")) raw_text = str(section.get("raw_text") or "").strip() if not paragraphs and not items and raw_text: paragraphs = [raw_text] return { "title": section.get("title") or "Раздел", "type": section_type, "raw_text": raw_text, "paragraphs": paragraphs, "list_items": items, "links": _normalize_links(section.get("links")), "year_entries": _normalize_year_entries(section.get("year_entries")), "publications": _normalize_publications(section.get("publications")), "publications_count": section.get("publications_count"), "theses": _normalize_theses(section.get("theses")), "theses_count": section.get("theses_count"), "academic_year": section.get("academic_year"), "courses": _normalize_courses(section.get("courses")), "table": _normalize_table(section.get("table")), } def _normalize_links(items: Any) -> list[dict[str, str | None]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): continue text = str(item.get("text") or item.get("url") or "").strip() url = str(item.get("url") or "").strip() if text and url: normalized.append({"text": text, "url": url}) return normalized def _normalize_year_entries(items: Any) -> list[dict[str, Any]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): continue text = str(item.get("text") or "").strip() if text: normalized.append({"year": item.get("year"), "text": text, "links": _normalize_links(item.get("links"))}) return normalized def _normalize_publications(items: Any) -> list[dict[str, str | None]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): text = str(item or "").strip() if text: normalized.append({"title": text, "text": text, "url": None}) continue title = str(item.get("title") or "").strip() text = str(item.get("text") or title).strip() url = str(item.get("url") or "").strip() if title or text: normalized.append({"title": title or text, "text": text or title, "url": url or None}) return normalized def _normalize_courses(items: Any) -> list[dict[str, str | None]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): title = str(item or "").strip() if title: normalized.append({"title": title, "url": None}) continue title = str(item.get("title") or "").strip() url = str(item.get("url") or "").strip() if title or url: normalized.append({"title": title or url, "url": url or None}) return normalized def _normalize_theses(items: Any) -> list[dict[str, Any]]: normalized = [] if not isinstance(items, list): return normalized for item in items: if not isinstance(item, dict): continue title = str(item.get("title") or "").strip() student = str(item.get("student") or "").strip() if not title and not student: continue normalized.append( { "id": item.get("id"), "student": student, "title": title, "defense_year": item.get("defense_year") or item.get("year"), "level": str(item.get("level") or "").strip(), "rating": item.get("rating"), "project_url": str(item.get("project_url") or "").strip() or None, "program": str(item.get("program") or "").strip(), "program_url": str(item.get("program_url") or "").strip() or None, "org_unit": str(item.get("org_unit") or "").strip(), "org_unit_url": str(item.get("org_unit_url") or "").strip() or None, } ) return normalized def _normalize_table(table: Any) -> dict[str, Any] | None: if not isinstance(table, dict): return None headers = _clean_list(table.get("headers")) rows = [] for row in table.get("rows") or []: if not isinstance(row, dict): continue cells = _clean_list(row.get("cells")) if cells: rows.append({"cells": cells, "link_url": row.get("link_url")}) if not headers and not rows: return None return {"headers": headers, "rows": rows}