diff --git a/MCP_DESCRIPTION.md b/MCP_DESCRIPTION.md index 1ece641..cdaaa20 100644 --- a/MCP_DESCRIPTION.md +++ b/MCP_DESCRIPTION.md @@ -171,6 +171,7 @@ MCP читает данные из основной базы через SQLAlche Основные таблицы и модели: - `employees`: текущая карточка сотрудника, статус, профиль, `current_data`, checksum. +- `employee_publications`: нормализованные публикации сотрудников с авторами, DOI, аннотацией, описанием, citation text и raw JSON из HSE Publications. - `crawl_runs`: история запусков парсинга. - `crawl_run_employee_changes`: детальные изменения сотрудников в рамках запуска. - `crawl_errors`: ошибки парсинга в рамках запуска. @@ -387,7 +388,7 @@ Hash набора считается по отсортированному сп ### list_employee_publications -Назначение: вернуть публикации сотрудника из распарсенных секций профиля. +Назначение: вернуть публикации сотрудника. Если есть нормализованные строки в `employee_publications`, tool возвращает детальные публикационные данные: авторов, DOI, аннотацию, описание, citation text, год, тип, язык, статус и ссылки. Если детальная таблица еще не заполнена, tool использует старый fallback из `employees.current_data.sections[].publications`. Аргументы: @@ -397,24 +398,70 @@ Hash набора считается по отсортированному сп } ``` -Сервис ищет секции `current_data.sections` с `type = "publications"` и объединяет массивы `publications`. +Поиск сотрудника выполняется так же, как в `get_employee`: по `profile_key`, `profile_id`, точному или частичному `canonical_url`. + +Порядок источников: + +- сначала `employee_publications`, отсортированные по году, названию и внутреннему id; +- если записей нет, секции `current_data.sections` с `type = "publications"` и массивами `publications`. Ответ: ```json { - "employee": {}, + "employee": { + "profile_key": "org_person:803294906", + "profile_id": "803294906", + "full_name": "Борисов Сергей Петрович", + "status": "active", + "canonical_url": "https://www.hse.ru/org/persons/803294906", + "last_seen_at": "2026-05-14T10:00:00+00:00", + "dismissed_at": null + }, "items": [ { + "id": "888959076", + "publication_id": "888959076", "title": "Название публикации", - "text": "Полное описание", - "url": "https://..." + "text": "Краткое описание или citation", + "url": "https://publications.hse.ru/view/888959076", + "year": 2023, + "type": "ARTICLE", + "publication_type": "ARTICLE", + "language": "ru", + "status": 1, + "doi_url": "https://doi.org/10.53921/18195822_2023_23_4_624", + "other_url": "https://example.test", + "document_url": "https://example.test/file.pdf", + "citation_text": "Авторы. Название публикации // Журнал. 2023.", + "annotation": { + "ru": "Аннотация", + "en": "Abstract" + }, + "description": { + "main": "Авторы. Название публикации // Журнал. 2023." + }, + "authors": [ + { + "id": "803294906", + "href": "https://www.hse.ru/org/persons/803294906", + "title_ru": "Борисов С. П.", + "title_en": "", + "reverse_title_ru": "С. П. Борисов", + "reverse_title_en": "", + "alt_name": "S. P. Borisov", + "other_name": null, + "is_current_employee": true + } + ] } ] } ``` -Если сотрудник или данные профиля отсутствуют: +В fallback-режиме из `current_data` старые элементы могут содержать только базовые поля `title`, `text`, `url` и `id`. + +Если сотрудник не найден: ```json { @@ -422,6 +469,15 @@ Hash набора считается по отсортированному сп } ``` +Если сотрудник найден, но публикаций нет: + +```json +{ + "employee": {}, + "items": [] +} +``` + ### list_employee_courses Назначение: вернуть курсы преподавания сотрудника из распарсенных секций профиля. diff --git a/README.md b/README.md index 6c76c4a..07091f6 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ - `mcp`: открытый HTTP MCP endpoint для ИИ-агентов. - `postgres`: основная БД. -Парсер использует фиксированный источник сотрудников, по умолчанию `https://miem.hse.ru/persons`. Для каждой карточки сохраняются ФИО, должности, год начала работы, контакты, идентификаторы, вкладки профиля, секции, публикации, курсы, ВКР, JSON-снапшот и сжатый HTML-снапшот. Ссылки обходятся только из меню профиля самого сотрудника (`person-menu`), например `#sci`, `#teaching`, `#main`. +Парсер использует фиксированный источник сотрудников, по умолчанию `https://miem.hse.ru/persons`. Для каждой карточки сохраняются ФИО, должности, год начала работы, контакты, идентификаторы, вкладки профиля, секции, публикации, курсы, ВКР, JSON-снапшот и сжатый HTML-снапшот. Детальные публикации дополнительно нормализуются в отдельную таблицу `employee_publications`. Ссылки обходятся только из меню профиля самого сотрудника (`person-menu`), например `#sci`, `#teaching`, `#main`. ## Переменные окружения @@ -60,6 +60,19 @@ docker compose up --build Таблицы создаются приложением при старте. При обновлении существующей базы приложение также добавляет недостающие runtime-колонки, например `crawl_runs.skipped_count`. SQL-миграции для ручного применения лежат в `migrations/`. +## Наполнение БД + +Основная карточка сотрудника хранится в `employees`: профиль, статус, даты обнаружения/увольнения, текущий JSON `current_data`, checksum и версия парсера. История успешных изменений сохраняется в `employee_snapshots` вместе с JSON-снимком и сжатым HTML профиля. + +Публикации теперь хранятся в двух видах: + +- краткий список остается внутри `employees.current_data.sections[].publications` для обратной совместимости; +- детальные записи сохраняются в `employee_publications` и связываются с сотрудником через `employee_id`. + +`employee_publications` содержит `publication_id`, название, год, тип публикации, язык, статус, ссылку на карточку HSE Publications, DOI, внешние/document-ссылки, citation text, аннотацию, описание, авторов, raw JSON ответа `searchPubs` и `source_hash` для безопасного повторного upsert. Уникальность поддерживается по `(employee_id, publication_id)` и `(employee_id, source_hash)`, поэтому повторный crawl не должен создавать дубликаты. + +`list_employee_publications` сначала читает `employee_publications`; если детальных строк еще нет, возвращает старые публикации из `current_data`. + ## Парсинг Weekly worker запускается по `CRAWL_CRON`. Ручной запуск доступен в админке на `Dashboard` и странице `Runs` или через REST: @@ -73,6 +86,7 @@ curl -X POST http://localhost:8000/api/crawl-runs --cookie "miem_admin_session=. - найденные сотрудники получают статус `active` и обновленный `last_seen_at`; - новые сотрудники добавляются в `employees`; - количество новых сотрудников за запуск сохраняется в `crawl_runs.new_count`; +- публикации из HSE Publications записываются в `employee_publications`, а краткий список остается в JSON профиля; - активные сотрудники, исчезнувшие из текущего списка источника, получают статус `dismissed` и `dismissed_at`; - каждый успешный новый или измененный разбор сохраняет запись в `employee_snapshots`; - неизмененные профили учитываются в `crawl_runs.skipped_count` и не получают новый snapshot. @@ -89,7 +103,7 @@ Endpoint: `POST /mcp`, без авторизации на уровне прил - `sync_employees(client_hash?, include_data?)` - `search_employees(query, status?, limit?)` - `get_employee(profile_id_or_url)` -- `list_employee_publications(profile_id_or_url)` +- `list_employee_publications(profile_id_or_url)` — публикации сотрудника; при наличии данных из `employee_publications` возвращает авторов, DOI, аннотацию, описание, citation text, год, тип, язык, статус и ссылку HSE Publications. - `list_employee_courses(profile_id_or_url)` - `get_crawl_status()` - `get_crawl_run_details(run_id)` @@ -115,4 +129,4 @@ docker compose exec postgres pg_dump -U miem miem_workers > backup.sql docker compose down ``` -Версия сервиса: `0.6.1`. Админка всегда показывает версии backend и frontend в footer. +Версия сервиса: `0.6.2`. Админка всегда показывает версии backend и frontend в footer. diff --git a/app/db.py b/app/db.py index 14272cd..4d565da 100644 --- a/app/db.py +++ b/app/db.py @@ -29,8 +29,15 @@ def init_db() -> None: def _ensure_runtime_schema() -> None: + import app.models as models + inspector = inspect(engine) - if "crawl_runs" not in inspector.get_table_names(): + table_names = set(inspector.get_table_names()) + if "employees" in table_names and "employee_publications" not in table_names: + models.EmployeePublication.__table__.create(bind=engine, checkfirst=True) + inspector = inspect(engine) + table_names = set(inspector.get_table_names()) + if "crawl_runs" not in table_names: return crawl_run_columns = {column["name"] for column in inspector.get_columns("crawl_runs")} if "skipped_count" not in crawl_run_columns: diff --git a/app/mcp.py b/app/mcp.py index 4bac1a0..e63fbf3 100644 --- a/app/mcp.py +++ b/app/mcp.py @@ -5,7 +5,7 @@ from sqlalchemy import desc, or_, select from sqlalchemy.orm import Session from app.db import get_db -from app.models import CrawlRun, Employee +from app.models import CrawlRun, Employee, EmployeePublication from app.services.admin_data import run_detail_payload from app.services.dataset_versions import service_info_payload, sync_employees_payload from app.version import BACKEND_VERSION @@ -52,7 +52,10 @@ TOOLS = [ }, { "name": "list_employee_publications", - "description": "List publications parsed from an employee profile.", + "description": ( + "List employee publications with detailed fields when available: authors, DOI URL, annotation, " + "description, citation text, year, publication type, language, status, and HSE Publications URL." + ), "inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]}, }, { @@ -171,8 +174,14 @@ def _find_employee(db: Session, value: str) -> Employee | None: def _collect_section_items(employee: Employee | None, section_type: str) -> dict: - if not employee or not employee.current_data: + if not employee: return {"items": []} + if section_type == "publications": + publications = _stored_publications(employee) + if publications: + return {"employee": _employee_payload(employee, include_data=False), "items": publications} + if not employee.current_data: + return {"employee": _employee_payload(employee, include_data=False), "items": []} items = [] for section in employee.current_data.get("sections") or []: if section.get("type") != section_type: @@ -184,6 +193,41 @@ def _collect_section_items(employee: Employee | None, section_type: str) -> dict return {"employee": _employee_payload(employee, include_data=False), "items": items} +def _stored_publications(employee: Employee) -> list[dict]: + return [_publication_payload(publication) for publication in sorted(employee.publications, key=_publication_sort_key)] + + +def _publication_sort_key(publication: EmployeePublication) -> tuple: + return (publication.year or 0, publication.title or "", publication.id) + + +def _publication_payload(publication: EmployeePublication) -> dict: + text = publication.citation_text or publication.title + payload = { + "id": publication.publication_id, + "publication_id": publication.publication_id, + "title": publication.title, + "text": text, + "url": publication.url, + } + optional = { + "year": publication.year, + "type": publication.publication_type, + "publication_type": publication.publication_type, + "language": publication.language, + "status": publication.status, + "doi_url": publication.doi_url, + "other_url": publication.other_url, + "document_url": publication.document_url, + "citation_text": publication.citation_text, + "annotation": publication.annotation, + "description": publication.description, + "authors": publication.authors, + } + payload.update({key: value for key, value in optional.items() if value not in (None, [], {})}) + return payload + + def _employee_payload(employee: Employee, include_data: bool = True) -> dict: payload = { "profile_key": employee.profile_key, diff --git a/app/models.py b/app/models.py index 5c40bf2..08dc5ba 100644 --- a/app/models.py +++ b/app/models.py @@ -41,6 +41,7 @@ class Employee(Base): snapshots: Mapped[list["EmployeeSnapshot"]] = relationship(back_populates="employee") tabs: Mapped[list["ProfileTab"]] = relationship(back_populates="employee", cascade="all, delete-orphan") + publications: Mapped[list["EmployeePublication"]] = relationship(back_populates="employee", cascade="all, delete-orphan") crawl_run_changes: Mapped[list["CrawlRunEmployeeChange"]] = relationship(back_populates="employee") @@ -60,6 +61,42 @@ class EmployeeSnapshot(Base): employee: Mapped[Employee] = relationship(back_populates="snapshots") +class EmployeePublication(Base): + __tablename__ = "employee_publications" + __table_args__ = ( + UniqueConstraint("employee_id", "publication_id", name="uq_employee_publications_employee_publication"), + UniqueConstraint("employee_id", "source_hash", name="uq_employee_publications_employee_source_hash"), + Index("ix_employee_publications_employee_id", "employee_id"), + Index("ix_employee_publications_publication_id", "publication_id"), + Index("ix_employee_publications_doi_url", "doi_url"), + Index("ix_employee_publications_year", "year"), + Index("ix_employee_publications_publication_type", "publication_type"), + ) + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + employee_id: Mapped[int] = mapped_column(ForeignKey("employees.id", ondelete="CASCADE"), nullable=False) + publication_id: Mapped[str | None] = mapped_column(String(64)) + title: Mapped[str] = mapped_column(Text, nullable=False) + year: Mapped[int | None] = mapped_column(Integer) + publication_type: Mapped[str | None] = mapped_column(String(64)) + language: Mapped[str | None] = mapped_column(String(16)) + status: Mapped[int | None] = mapped_column(Integer) + url: Mapped[str | None] = mapped_column(Text) + doi_url: Mapped[str | None] = mapped_column(Text) + other_url: Mapped[str | None] = mapped_column(Text) + document_url: Mapped[str | None] = mapped_column(Text) + citation_text: Mapped[str | None] = mapped_column(Text) + annotation: Mapped[dict | None] = mapped_column(json_type) + description: Mapped[dict | None] = mapped_column(json_type) + authors: Mapped[list | None] = mapped_column(json_type) + raw_data: Mapped[dict | None] = mapped_column(json_type) + source_hash: Mapped[str] = mapped_column(String(64), nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow, nullable=False) + + employee: Mapped[Employee] = relationship(back_populates="publications") + + class CrawlRun(Base): __tablename__ = "crawl_runs" diff --git a/app/parser/profile.py b/app/parser/profile.py index 67af0f4..338cf3e 100644 --- a/app/parser/profile.py +++ b/app/parser/profile.py @@ -333,7 +333,7 @@ def _load_widget_publications( items = _extract_publication_items(result) if not items: break - publications.extend(_normalize_publication_item(item) for item in items) + publications.extend(_normalize_publication_item(item, author_id) for item in items) total = int(result.get("total") or 0) if not result.get("more") and len(publications) >= total: @@ -575,20 +575,37 @@ def _parse_vkr_items(nodes: list) -> list[str]: return [item for item in dict.fromkeys(items) if item] -def _normalize_publication_item(item: dict) -> dict: +def _normalize_publication_item(item: dict, current_author_id: str | None = None) -> dict: publication_id = str(item.get("id") or "").strip() title = _html_to_text(item.get("title")) - year = item.get("year") + year = _int_or_none(item.get("year")) publication_type = str(item.get("type") or "").strip() or None description = item.get("description") if isinstance(item.get("description"), dict) else {} short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft")) + documents = item.get("documents") if isinstance(item.get("documents"), dict) else {} + language = item.get("language") if isinstance(item.get("language"), dict) else {} + annotation = _localized_text_map(item.get("annotation")) + authors = _normalize_publication_authors(item.get("authorsByType"), current_author_id) + citation_text = normalize_ws(str(description.get("main") or "")) or _build_publication_citation(title, authors, year) text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part)) return { "id": publication_id or None, + "publication_id": publication_id or None, "title": title or publication_id, "year": year, "type": publication_type, + "publication_type": publication_type, + "language": normalize_ws(language.get("name")) or None, + "status": _int_or_none(item.get("status")), "url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None, + "doi_url": _document_href(documents, "DOI"), + "other_url": _document_href(documents, "OTHER_URL"), + "document_url": _document_href(documents, "DOCUMENT"), + "citation_text": citation_text or None, + "annotation": annotation, + "description": description or None, + "authors": authors, + "raw_data": item, "text": text or title or publication_id, } @@ -685,12 +702,69 @@ def _html_to_text(value: object) -> str: return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True)) +def _localized_text_map(value: object) -> dict[str, str]: + if not isinstance(value, dict): + return {} + localized = {} + for key in ("ru", "en", "publ"): + text = _html_to_text(value.get(key)) + if text: + localized[key] = text + return localized + + def _localized_value(value: object) -> str: if isinstance(value, dict): return normalize_ws(value.get("ru") or value.get("publ") or value.get("en")) return normalize_ws(str(value or "")) +def _normalize_publication_authors(value: object, current_author_id: str | None) -> list[dict]: + if not isinstance(value, dict): + return [] + authors = [] + for author in value.get("author") or []: + if not isinstance(author, dict): + continue + title = author.get("title") if isinstance(author.get("title"), dict) else {} + reverse_title = author.get("reverseTitle") if isinstance(author.get("reverseTitle"), dict) else {} + author_id = normalize_ws(author.get("id")) + href = normalize_ws(author.get("href")) + authors.append( + { + "id": author_id or None, + "href": urljoin("https://www.hse.ru", href) if href else None, + "title_ru": _html_to_text(title.get("ru")), + "title_en": _html_to_text(title.get("en")), + "reverse_title_ru": _html_to_text(reverse_title.get("ru")), + "reverse_title_en": _html_to_text(reverse_title.get("en")), + "alt_name": normalize_ws(author.get("altName")) or None, + "other_name": normalize_ws(author.get("otherName")) or None, + "is_current_employee": bool(current_author_id and author_id == current_author_id), + } + ) + return authors + + +def _document_href(documents: dict, key: str) -> str | None: + document = documents.get(key) + if not isinstance(document, dict): + return None + return normalize_ws(document.get("href")) or None + + +def _build_publication_citation(title: str, authors: list[dict], year: int | None) -> str: + author_names = [author.get("title_ru") or author.get("title_en") or author.get("alt_name") for author in authors] + return normalize_ws(". ".join(part for part in [", ".join(filter(None, author_names)), title, str(year or "")] if part)) + + +def _int_or_none(value: object) -> int | None: + try: + return int(value) + except (TypeError, ValueError): + return None + + def _slugify(value: str) -> str: cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE) return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section" diff --git a/app/services/crawler.py b/app/services/crawler.py index 61f59ee..2ce95c0 100644 --- a/app/services/crawler.py +++ b/app/services/crawler.py @@ -6,11 +6,20 @@ import time from datetime import datetime, timezone import requests -from sqlalchemy import select +from sqlalchemy import inspect, select from sqlalchemy.orm import Session from app.config import Settings -from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab +from app.models import ( + CrawlError, + CrawlRun, + CrawlRunEmployeeChange, + Employee, + EmployeePublication, + EmployeeSnapshot, + ParserSource, + ProfileTab, +) from app.parser.collector import collect_profile_links from app.parser.profile import parse_person_profile from app.parser.profile_url import profile_key @@ -219,9 +228,127 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee parser_version=parser_version, ) ) + db.flush() + _try_sync_employee_publications(db, run, employee, parsed) return employee, changed +def _try_sync_employee_publications(db: Session, run: CrawlRun, employee: Employee, parsed: dict) -> None: + try: + if not _publication_payloads(parsed): + return + if not _employee_publications_table_exists(db): + return + with db.begin_nested(): + _sync_employee_publications(db, employee, parsed) + except Exception as exc: + db.add( + CrawlError( + crawl_run_id=run.id, + profile_url=employee.canonical_url, + error_type=type(exc).__name__, + message=f"Не удалось сохранить публикации сотрудника: {exc}", + ) + ) + + +def _employee_publications_table_exists(db: Session) -> bool: + return inspect(db.connection()).has_table(EmployeePublication.__tablename__) + + +def _sync_employee_publications(db: Session, employee: Employee, parsed: dict) -> None: + publications = _publication_payloads(parsed) + seen_hashes = set() + for publication in publications: + source_hash = _publication_hash(publication) + seen_hashes.add(source_hash) + publication_id = _clean_optional(publication.get("publication_id") or publication.get("id")) + existing = None + if publication_id: + existing = db.scalar( + select(EmployeePublication).where( + EmployeePublication.employee_id == employee.id, + EmployeePublication.publication_id == publication_id, + ) + ) + if not existing: + existing = db.scalar( + select(EmployeePublication).where( + EmployeePublication.employee_id == employee.id, + EmployeePublication.source_hash == source_hash, + ) + ) + if not existing: + existing = EmployeePublication(employee_id=employee.id, source_hash=source_hash, title=_publication_title(publication)) + db.add(existing) + _apply_publication(existing, publication, source_hash) + + if seen_hashes: + stale = db.scalars( + select(EmployeePublication).where( + EmployeePublication.employee_id == employee.id, + EmployeePublication.source_hash.not_in(seen_hashes), + ) + ).all() + for item in stale: + db.delete(item) + + +def _publication_payloads(parsed: dict) -> list[dict]: + publications = [] + for section in parsed.get("sections") or []: + if not isinstance(section, dict) or section.get("type") != "publications": + continue + for publication in section.get("publications") or []: + if isinstance(publication, dict): + publications.append(publication) + return publications + + +def _apply_publication(target: EmployeePublication, publication: dict, source_hash: str) -> None: + target.publication_id = _clean_optional(publication.get("publication_id") or publication.get("id")) + target.title = _publication_title(publication) + target.year = _int_or_none(publication.get("year")) + target.publication_type = _clean_optional(publication.get("publication_type") or publication.get("type")) + target.language = _clean_optional(publication.get("language")) + target.status = _int_or_none(publication.get("status")) + target.url = _clean_optional(publication.get("url")) + target.doi_url = _clean_optional(publication.get("doi_url")) + target.other_url = _clean_optional(publication.get("other_url")) + target.document_url = _clean_optional(publication.get("document_url")) + target.citation_text = _clean_optional(publication.get("citation_text") or publication.get("text")) + target.annotation = publication.get("annotation") if isinstance(publication.get("annotation"), dict) else None + target.description = publication.get("description") if isinstance(publication.get("description"), dict) else None + target.authors = publication.get("authors") if isinstance(publication.get("authors"), list) else None + target.raw_data = publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication + target.source_hash = source_hash + + +def _publication_hash(publication: dict) -> str: + return _payload_hash(publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication) + + +def _payload_hash(value: object) -> str: + payload = json.dumps(_stable_checksum_payload(value), ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def _publication_title(publication: dict) -> str: + return _clean_optional(publication.get("title") or publication.get("text") or publication.get("id")) or "Untitled publication" + + +def _clean_optional(value: object) -> str | None: + text = str(value or "").strip() + return text or None + + +def _int_or_none(value: object) -> int | None: + try: + return int(value) + except (TypeError, ValueError): + return None + + def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int: dismissed = 0 active = db.scalars(select(Employee).where(Employee.status == "active")).all() diff --git a/app/version.py b/app/version.py index 49eb537..66c30a3 100644 --- a/app/version.py +++ b/app/version.py @@ -1,3 +1,3 @@ -APP_VERSION = "0.6.1" -FRONTEND_VERSION = "0.6.1" -BACKEND_VERSION = "0.6.1" +APP_VERSION = "0.6.2" +FRONTEND_VERSION = "0.6.2" +BACKEND_VERSION = "0.6.2" diff --git a/migrations/006_employee_publications.sql b/migrations/006_employee_publications.sql new file mode 100644 index 0000000..f40b604 --- /dev/null +++ b/migrations/006_employee_publications.sql @@ -0,0 +1,39 @@ +CREATE TABLE IF NOT EXISTS employee_publications ( + id SERIAL PRIMARY KEY, + employee_id INTEGER NOT NULL REFERENCES employees(id) ON DELETE CASCADE, + publication_id VARCHAR(64), + title TEXT NOT NULL, + year INTEGER, + publication_type VARCHAR(64), + language VARCHAR(16), + status INTEGER, + url TEXT, + doi_url TEXT, + other_url TEXT, + document_url TEXT, + citation_text TEXT, + annotation JSONB, + description JSONB, + authors JSONB, + raw_data JSONB, + source_hash VARCHAR(64) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_employee_publications_employee_publication UNIQUE (employee_id, publication_id), + CONSTRAINT uq_employee_publications_employee_source_hash UNIQUE (employee_id, source_hash) +); + +CREATE INDEX IF NOT EXISTS ix_employee_publications_employee_id + ON employee_publications (employee_id); + +CREATE INDEX IF NOT EXISTS ix_employee_publications_publication_id + ON employee_publications (publication_id); + +CREATE INDEX IF NOT EXISTS ix_employee_publications_doi_url + ON employee_publications (doi_url); + +CREATE INDEX IF NOT EXISTS ix_employee_publications_year + ON employee_publications (year); + +CREATE INDEX IF NOT EXISTS ix_employee_publications_publication_type + ON employee_publications (publication_type); diff --git a/pyproject.toml b/pyproject.toml index 843c32b..93967f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "miem-workers" -version = "0.6.1" +version = "0.6.2" description = "MIEM employees parser, admin API, and MCP server" requires-python = ">=3.11" dependencies = [ diff --git a/tests/test_api_mcp.py b/tests/test_api_mcp.py index c74fe6f..ca43e22 100644 --- a/tests/test_api_mcp.py +++ b/tests/test_api_mcp.py @@ -10,7 +10,7 @@ from sqlalchemy.pool import StaticPool from app.config import Settings, get_settings from app.db import Base, get_db from app.main import app -from app.models import CrawlRun, CrawlRunEmployeeChange, Employee +from app.models import CrawlRun, CrawlRunEmployeeChange, Employee, EmployeePublication from app.security import SESSION_COOKIE, sign_session @@ -20,7 +20,7 @@ def test_health_returns_versions(): response = client.get("/api/health") assert response.status_code == 200 - assert response.json()["backend_version"] == "0.6.1" + assert response.json()["backend_version"] == "0.6.2" def test_mcp_lists_tools_without_auth_and_ignores_auth_header(): @@ -154,13 +154,115 @@ def test_mcp_service_info_returns_tools_and_dataset_hash(): assert response.status_code == 200 payload = json.loads(response.json()["result"]["content"][0]["text"]) assert payload["service_name"] == "miem-employees" - assert payload["backend_version"] == "0.6.1" + assert payload["backend_version"] == "0.6.2" assert payload["dataset"]["hash"] assert any(tool["name"] == "sync_employees" for tool in payload["tools"]) app.dependency_overrides.clear() +def test_mcp_list_employee_publications_prefers_stored_publications_with_fallback(): + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine) + session = Session() + stored_employee = Employee( + profile_key="staff:stored", + profile_type="staff", + profile_id="stored", + canonical_url="https://www.hse.ru/staff/stored", + full_name="Stored Person", + status="active", + current_data={ + "sections": [ + { + "type": "publications", + "publications": [{"title": "Old JSON Publication", "url": "https://example.test/old"}], + } + ] + }, + ) + fallback_employee = Employee( + profile_key="staff:fallback", + profile_type="staff", + profile_id="fallback", + canonical_url="https://www.hse.ru/staff/fallback", + full_name="Fallback Person", + status="active", + current_data={ + "sections": [ + { + "type": "publications", + "publications": [{"title": "Fallback Publication", "url": "https://example.test/fallback"}], + } + ] + }, + ) + session.add_all([stored_employee, fallback_employee]) + session.commit() + session.add( + EmployeePublication( + employee_id=stored_employee.id, + publication_id="pub-1", + title="Stored Publication", + year=2024, + publication_type="ARTICLE", + url="https://publications.hse.ru/view/pub-1", + doi_url="https://doi.org/10.1/test", + citation_text="Stored Citation", + annotation={"ru": "Аннотация", "en": "Abstract"}, + description={"main": "Stored Citation"}, + authors=[{"id": "1", "title_ru": "Автор", "is_current_employee": True}], + source_hash="a" * 64, + ) + ) + session.commit() + session.close() + + def override_db(): + db = Session() + try: + yield db + finally: + db.close() + + app.dependency_overrides[get_db] = override_db + client = TestClient(app) + + stored_response = client.post( + "/mcp", + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": "list_employee_publications", "arguments": {"profile_id_or_url": "stored"}}, + }, + ) + fallback_response = client.post( + "/mcp", + json={ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {"name": "list_employee_publications", "arguments": {"profile_id_or_url": "fallback"}}, + }, + ) + + stored_payload = json.loads(stored_response.json()["result"]["content"][0]["text"]) + fallback_payload = json.loads(fallback_response.json()["result"]["content"][0]["text"]) + assert stored_payload["items"][0]["title"] == "Stored Publication" + assert stored_payload["items"][0]["doi_url"] == "https://doi.org/10.1/test" + assert stored_payload["items"][0]["annotation"] == {"ru": "Аннотация", "en": "Abstract"} + assert stored_payload["items"][0]["authors"] == [{"id": "1", "title_ru": "Автор", "is_current_employee": True}] + assert fallback_payload["items"][0]["title"] == "Fallback Publication" + + app.dependency_overrides.clear() + + def test_mcp_sync_employees_full_empty_and_unknown_hash_modes(): engine = create_engine( "sqlite:///:memory:", diff --git a/tests/test_crawler.py b/tests/test_crawler.py index f47b389..430606a 100644 --- a/tests/test_crawler.py +++ b/tests/test_crawler.py @@ -1,7 +1,7 @@ import gzip from datetime import datetime, timezone -from app.models import CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParseResourceCache +from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeePublication, EmployeeSnapshot, ParseResourceCache from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee from app.services.resource_cache import ResourceCache @@ -191,6 +191,68 @@ def test_upsert_employee_skips_snapshot_when_checksum_is_unchanged(db_session): assert db_session.query(EmployeeSnapshot).count() == 1 +def test_upsert_employee_saves_publications_and_reuses_existing_rows(db_session): + first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") + second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") + db_session.add_all([first_run, second_run]) + db_session.commit() + + parsed = _parsed_employee("published") + parsed["sections"] = [ + { + "type": "publications", + "publications": [ + { + "id": "888959076", + "publication_id": "888959076", + "title": "Detailed Publication", + "year": 2023, + "publication_type": "ARTICLE", + "language": "ru", + "status": 1, + "url": "https://publications.hse.ru/view/888959076", + "doi_url": "https://doi.org/10.1/test", + "citation_text": "Detailed citation", + "annotation": {"ru": "Аннотация"}, + "description": {"main": "Detailed citation"}, + "authors": [{"id": "1", "title_ru": "Автор"}], + "raw_data": {"id": "888959076", "title": "Detailed Publication"}, + } + ], + } + ] + + employee, _ = _upsert_employee(db_session, first_run, parsed) + db_session.commit() + _upsert_employee(db_session, second_run, _parsed_employee_with_publication("published")) + db_session.commit() + + publications = db_session.query(EmployeePublication).filter_by(employee_id=employee.id).all() + assert len(publications) == 1 + assert publications[0].doi_url == "https://doi.org/10.1/test" + assert publications[0].authors == [{"id": "1", "title_ru": "Автор"}] + + +def test_upsert_employee_records_publication_errors_without_failing_employee(monkeypatch, db_session): + run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") + db_session.add(run) + db_session.commit() + + def broken_sync(*_args, **_kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr("app.services.crawler._sync_employee_publications", broken_sync) + + employee, changed = _upsert_employee(db_session, run, _parsed_employee_with_publication("error-safe")) + db_session.commit() + + assert changed is True + assert employee.full_name == "Same Person" + assert db_session.query(Employee).filter_by(profile_key="staff:error-safe").one() + error = db_session.query(CrawlError).one() + assert "публикации" in error.message.lower() + + def test_checksum_changes_when_widget_data_changes(): base = _parsed_employee("widgets") changed = _parsed_employee("widgets") @@ -224,3 +286,31 @@ def _parsed_employee(profile_id: str) -> dict: "parser_version": "0.6.0", "_html": "", } + + +def _parsed_employee_with_publication(profile_id: str) -> dict: + parsed = _parsed_employee(profile_id) + parsed["sections"] = [ + { + "type": "publications", + "publications": [ + { + "id": "888959076", + "publication_id": "888959076", + "title": "Detailed Publication", + "year": 2023, + "publication_type": "ARTICLE", + "language": "ru", + "status": 1, + "url": "https://publications.hse.ru/view/888959076", + "doi_url": "https://doi.org/10.1/test", + "citation_text": "Detailed citation", + "annotation": {"ru": "Аннотация"}, + "description": {"main": "Detailed citation"}, + "authors": [{"id": "1", "title_ru": "Автор"}], + "raw_data": {"id": "888959076", "title": "Detailed Publication"}, + } + ], + } + ] + return parsed diff --git a/tests/test_db_schema.py b/tests/test_db_schema.py index a806a4c..a250b25 100644 --- a/tests/test_db_schema.py +++ b/tests/test_db_schema.py @@ -25,3 +25,47 @@ def test_runtime_schema_adds_skipped_count_to_existing_crawl_runs_table(monkeypa columns = {column["name"] for column in inspect(engine).get_columns("crawl_runs")} assert "skipped_count" in columns + + +def test_runtime_schema_creates_employee_publications_table_when_employees_exist(monkeypatch): + engine = create_engine("sqlite:///:memory:") + with engine.begin() as connection: + connection.execute( + text( + """ + CREATE TABLE employees ( + id INTEGER PRIMARY KEY, + profile_key VARCHAR(255) NOT NULL UNIQUE, + canonical_url TEXT NOT NULL, + status VARCHAR(32) NOT NULL DEFAULT 'active', + first_seen_at DATETIME NOT NULL, + last_seen_at DATETIME NOT NULL, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + ) + """ + ) + ) + connection.execute( + text( + """ + CREATE TABLE crawl_runs ( + id INTEGER PRIMARY KEY, + source_url TEXT NOT NULL, + status VARCHAR(32) NOT NULL DEFAULT 'running', + found_count INTEGER NOT NULL DEFAULT 0, + parsed_count INTEGER NOT NULL DEFAULT 0, + skipped_count INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + ) + monkeypatch.setattr("app.db.engine", engine) + + _ensure_runtime_schema() + _ensure_runtime_schema() + + inspector = inspect(engine) + assert "employee_publications" in inspector.get_table_names() + columns = {column["name"] for column in inspector.get_columns("employee_publications")} + assert {"employee_id", "publication_id", "doi_url", "authors", "raw_data", "source_hash"}.issubset(columns) diff --git a/tests/test_parser.py b/tests/test_parser.py index 7adab9c..1120d05 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -34,7 +34,21 @@ class FakeSession: "type": "ARTICLE", "title": "Дублирование пакетов", "year": 2023, + "language": {"name": "ru"}, + "status": 1, + "authorsByType": { + "author": [ + { + "id": "568398853", + "href": "/org/persons/568398853", + "title": {"ru": "Левицкий И. А.", "en": ""}, + "reverseTitle": {"ru": "И. А. Левицкий", "en": ""}, + } + ] + }, "description": {"short": {"ru": "Информационные процессы. 2023."}}, + "annotation": {"ru": "

Русская аннотация

"}, + "documents": {"DOI": {"href": "https://doi.org/10.1/test"}}, } ], }, @@ -153,6 +167,9 @@ def test_enrich_sections_from_hse_widgets_loads_publications_and_vkr(): assert publications["publications_count"] == 1 assert publications["publications"][0]["url"] == "https://publications.hse.ru/view/888959076" + assert publications["publications"][0]["doi_url"] == "https://doi.org/10.1/test" + assert publications["publications"][0]["annotation"] == {"ru": "Русская аннотация"} + assert publications["publications"][0]["authors"][0]["is_current_employee"] is True assert theses["theses_count"] == 1 assert theses["theses"][0]["student"] == "Лесняк Владислав Евгеньевич" assert theses["theses"][0]["project_url"] == "https://www.hse.ru/edu/vkr/1045750164"