Compare commits

...

3 Commits

14 changed files with 677 additions and 26 deletions

View File

@@ -171,6 +171,7 @@ MCP читает данные из основной базы через SQLAlche
Основные таблицы и модели: Основные таблицы и модели:
- `employees`: текущая карточка сотрудника, статус, профиль, `current_data`, checksum. - `employees`: текущая карточка сотрудника, статус, профиль, `current_data`, checksum.
- `employee_publications`: нормализованные публикации сотрудников с авторами, DOI, аннотацией, описанием, citation text и raw JSON из HSE Publications.
- `crawl_runs`: история запусков парсинга. - `crawl_runs`: история запусков парсинга.
- `crawl_run_employee_changes`: детальные изменения сотрудников в рамках запуска. - `crawl_run_employee_changes`: детальные изменения сотрудников в рамках запуска.
- `crawl_errors`: ошибки парсинга в рамках запуска. - `crawl_errors`: ошибки парсинга в рамках запуска.
@@ -387,7 +388,7 @@ Hash набора считается по отсортированному сп
### list_employee_publications ### list_employee_publications
Назначение: вернуть публикации сотрудника из распарсенных секций профиля. Назначение: вернуть публикации сотрудника. Если есть нормализованные строки в `employee_publications`, tool возвращает детальные публикационные данные: авторов, DOI, аннотацию, описание, citation text, год, тип, язык, статус и ссылки. Если детальная таблица еще не заполнена, tool использует старый fallback из `employees.current_data.sections[].publications`.
Аргументы: Аргументы:
@@ -397,24 +398,70 @@ Hash набора считается по отсортированному сп
} }
``` ```
Сервис ищет секции `current_data.sections` с `type = "publications"` и объединяет массивы `publications`. Поиск сотрудника выполняется так же, как в `get_employee`: по `profile_key`, `profile_id`, точному или частичному `canonical_url`.
Порядок источников:
- сначала `employee_publications`, отсортированные по году, названию и внутреннему id;
- если записей нет, секции `current_data.sections` с `type = "publications"` и массивами `publications`.
Ответ: Ответ:
```json ```json
{ {
"employee": {}, "employee": {
"profile_key": "org_person:803294906",
"profile_id": "803294906",
"full_name": "Борисов Сергей Петрович",
"status": "active",
"canonical_url": "https://www.hse.ru/org/persons/803294906",
"last_seen_at": "2026-05-14T10:00:00+00:00",
"dismissed_at": null
},
"items": [ "items": [
{ {
"id": "888959076",
"publication_id": "888959076",
"title": "Название публикации", "title": "Название публикации",
"text": "Полное описание", "text": "Краткое описание или citation",
"url": "https://..." "url": "https://publications.hse.ru/view/888959076",
"year": 2023,
"type": "ARTICLE",
"publication_type": "ARTICLE",
"language": "ru",
"status": 1,
"doi_url": "https://doi.org/10.53921/18195822_2023_23_4_624",
"other_url": "https://example.test",
"document_url": "https://example.test/file.pdf",
"citation_text": "Авторы. Название публикации // Журнал. 2023.",
"annotation": {
"ru": "Аннотация",
"en": "Abstract"
},
"description": {
"main": "Авторы. Название публикации // Журнал. 2023."
},
"authors": [
{
"id": "803294906",
"href": "https://www.hse.ru/org/persons/803294906",
"title_ru": "Борисов С. П.",
"title_en": "",
"reverse_title_ru": "С. П. Борисов",
"reverse_title_en": "",
"alt_name": "S. P. Borisov",
"other_name": null,
"is_current_employee": true
}
]
} }
] ]
} }
``` ```
Если сотрудник или данные профиля отсутствуют: В fallback-режиме из `current_data` старые элементы могут содержать только базовые поля `title`, `text`, `url` и `id`.
Если сотрудник не найден:
```json ```json
{ {
@@ -422,6 +469,15 @@ Hash набора считается по отсортированному сп
} }
``` ```
Если сотрудник найден, но публикаций нет:
```json
{
"employee": {},
"items": []
}
```
### list_employee_courses ### list_employee_courses
Назначение: вернуть курсы преподавания сотрудника из распарсенных секций профиля. Назначение: вернуть курсы преподавания сотрудника из распарсенных секций профиля.

View File

@@ -9,7 +9,7 @@
- `mcp`: открытый HTTP MCP endpoint для ИИ-агентов. - `mcp`: открытый HTTP MCP endpoint для ИИ-агентов.
- `postgres`: основная БД. - `postgres`: основная БД.
Парсер использует фиксированный источник сотрудников, по умолчанию `https://miem.hse.ru/persons`. Для каждой карточки сохраняются ФИО, должности, год начала работы, контакты, идентификаторы, вкладки профиля, секции, публикации, курсы, ВКР, JSON-снапшот и сжатый HTML-снапшот. Ссылки обходятся только из меню профиля самого сотрудника (`person-menu`), например `#sci`, `#teaching`, `#main`. Парсер использует фиксированный источник сотрудников, по умолчанию `https://miem.hse.ru/persons`. Для каждой карточки сохраняются ФИО, должности, год начала работы, контакты, идентификаторы, вкладки профиля, секции, публикации, курсы, ВКР, JSON-снапшот и сжатый HTML-снапшот. Детальные публикации дополнительно нормализуются в отдельную таблицу `employee_publications`. Ссылки обходятся только из меню профиля самого сотрудника (`person-menu`), например `#sci`, `#teaching`, `#main`.
## Переменные окружения ## Переменные окружения
@@ -60,6 +60,19 @@ docker compose up --build
Таблицы создаются приложением при старте. При обновлении существующей базы приложение также добавляет недостающие runtime-колонки, например `crawl_runs.skipped_count`. SQL-миграции для ручного применения лежат в `migrations/`. Таблицы создаются приложением при старте. При обновлении существующей базы приложение также добавляет недостающие runtime-колонки, например `crawl_runs.skipped_count`. SQL-миграции для ручного применения лежат в `migrations/`.
## Наполнение БД
Основная карточка сотрудника хранится в `employees`: профиль, статус, даты обнаружения/увольнения, текущий JSON `current_data`, checksum и версия парсера. История успешных изменений сохраняется в `employee_snapshots` вместе с JSON-снимком и сжатым HTML профиля.
Публикации теперь хранятся в двух видах:
- краткий список остается внутри `employees.current_data.sections[].publications` для обратной совместимости;
- детальные записи сохраняются в `employee_publications` и связываются с сотрудником через `employee_id`.
`employee_publications` содержит `publication_id`, название, год, тип публикации, язык, статус, ссылку на карточку HSE Publications, DOI, внешние/document-ссылки, citation text, аннотацию, описание, авторов, raw JSON ответа `searchPubs` и `source_hash` для безопасного повторного upsert. Уникальность поддерживается по `(employee_id, publication_id)` и `(employee_id, source_hash)`, поэтому повторный crawl не должен создавать дубликаты.
`list_employee_publications` сначала читает `employee_publications`; если детальных строк еще нет, возвращает старые публикации из `current_data`.
## Парсинг ## Парсинг
Weekly worker запускается по `CRAWL_CRON`. Ручной запуск доступен в админке на `Dashboard` и странице `Runs` или через REST: Weekly worker запускается по `CRAWL_CRON`. Ручной запуск доступен в админке на `Dashboard` и странице `Runs` или через REST:
@@ -73,6 +86,7 @@ curl -X POST http://localhost:8000/api/crawl-runs --cookie "miem_admin_session=.
- найденные сотрудники получают статус `active` и обновленный `last_seen_at`; - найденные сотрудники получают статус `active` и обновленный `last_seen_at`;
- новые сотрудники добавляются в `employees`; - новые сотрудники добавляются в `employees`;
- количество новых сотрудников за запуск сохраняется в `crawl_runs.new_count`; - количество новых сотрудников за запуск сохраняется в `crawl_runs.new_count`;
- публикации из HSE Publications записываются в `employee_publications`, а краткий список остается в JSON профиля;
- активные сотрудники, исчезнувшие из текущего списка источника, получают статус `dismissed` и `dismissed_at`; - активные сотрудники, исчезнувшие из текущего списка источника, получают статус `dismissed` и `dismissed_at`;
- каждый успешный новый или измененный разбор сохраняет запись в `employee_snapshots`; - каждый успешный новый или измененный разбор сохраняет запись в `employee_snapshots`;
- неизмененные профили учитываются в `crawl_runs.skipped_count` и не получают новый snapshot. - неизмененные профили учитываются в `crawl_runs.skipped_count` и не получают новый snapshot.
@@ -89,7 +103,7 @@ Endpoint: `POST /mcp`, без авторизации на уровне прил
- `sync_employees(client_hash?, include_data?)` - `sync_employees(client_hash?, include_data?)`
- `search_employees(query, status?, limit?)` - `search_employees(query, status?, limit?)`
- `get_employee(profile_id_or_url)` - `get_employee(profile_id_or_url)`
- `list_employee_publications(profile_id_or_url)` - `list_employee_publications(profile_id_or_url)` — публикации сотрудника; при наличии данных из `employee_publications` возвращает авторов, DOI, аннотацию, описание, citation text, год, тип, язык, статус и ссылку HSE Publications.
- `list_employee_courses(profile_id_or_url)` - `list_employee_courses(profile_id_or_url)`
- `get_crawl_status()` - `get_crawl_status()`
- `get_crawl_run_details(run_id)` - `get_crawl_run_details(run_id)`
@@ -115,4 +129,4 @@ docker compose exec postgres pg_dump -U miem miem_workers > backup.sql
docker compose down docker compose down
``` ```
Версия сервиса: `0.6.1`. Админка всегда показывает версии backend и frontend в footer. Версия сервиса: `0.6.2`. Админка всегда показывает версии backend и frontend в footer.

View File

@@ -29,8 +29,15 @@ def init_db() -> None:
def _ensure_runtime_schema() -> None: def _ensure_runtime_schema() -> None:
import app.models as models
inspector = inspect(engine) inspector = inspect(engine)
if "crawl_runs" not in inspector.get_table_names(): table_names = set(inspector.get_table_names())
if "employees" in table_names and "employee_publications" not in table_names:
models.EmployeePublication.__table__.create(bind=engine, checkfirst=True)
inspector = inspect(engine)
table_names = set(inspector.get_table_names())
if "crawl_runs" not in table_names:
return return
crawl_run_columns = {column["name"] for column in inspector.get_columns("crawl_runs")} crawl_run_columns = {column["name"] for column in inspector.get_columns("crawl_runs")}
if "skipped_count" not in crawl_run_columns: if "skipped_count" not in crawl_run_columns:

View File

@@ -5,7 +5,7 @@ from sqlalchemy import desc, or_, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.db import get_db from app.db import get_db
from app.models import CrawlRun, Employee from app.models import CrawlRun, Employee, EmployeePublication
from app.services.admin_data import run_detail_payload from app.services.admin_data import run_detail_payload
from app.services.dataset_versions import service_info_payload, sync_employees_payload from app.services.dataset_versions import service_info_payload, sync_employees_payload
from app.version import BACKEND_VERSION from app.version import BACKEND_VERSION
@@ -52,7 +52,10 @@ TOOLS = [
}, },
{ {
"name": "list_employee_publications", "name": "list_employee_publications",
"description": "List publications parsed from an employee profile.", "description": (
"List employee publications with detailed fields when available: authors, DOI URL, annotation, "
"description, citation text, year, publication type, language, status, and HSE Publications URL."
),
"inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]}, "inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]},
}, },
{ {
@@ -171,8 +174,14 @@ def _find_employee(db: Session, value: str) -> Employee | None:
def _collect_section_items(employee: Employee | None, section_type: str) -> dict: def _collect_section_items(employee: Employee | None, section_type: str) -> dict:
if not employee or not employee.current_data: if not employee:
return {"items": []} return {"items": []}
if section_type == "publications":
publications = _stored_publications(employee)
if publications:
return {"employee": _employee_payload(employee, include_data=False), "items": publications}
if not employee.current_data:
return {"employee": _employee_payload(employee, include_data=False), "items": []}
items = [] items = []
for section in employee.current_data.get("sections") or []: for section in employee.current_data.get("sections") or []:
if section.get("type") != section_type: if section.get("type") != section_type:
@@ -184,6 +193,41 @@ def _collect_section_items(employee: Employee | None, section_type: str) -> dict
return {"employee": _employee_payload(employee, include_data=False), "items": items} return {"employee": _employee_payload(employee, include_data=False), "items": items}
def _stored_publications(employee: Employee) -> list[dict]:
return [_publication_payload(publication) for publication in sorted(employee.publications, key=_publication_sort_key)]
def _publication_sort_key(publication: EmployeePublication) -> tuple:
return (publication.year or 0, publication.title or "", publication.id)
def _publication_payload(publication: EmployeePublication) -> dict:
text = publication.citation_text or publication.title
payload = {
"id": publication.publication_id,
"publication_id": publication.publication_id,
"title": publication.title,
"text": text,
"url": publication.url,
}
optional = {
"year": publication.year,
"type": publication.publication_type,
"publication_type": publication.publication_type,
"language": publication.language,
"status": publication.status,
"doi_url": publication.doi_url,
"other_url": publication.other_url,
"document_url": publication.document_url,
"citation_text": publication.citation_text,
"annotation": publication.annotation,
"description": publication.description,
"authors": publication.authors,
}
payload.update({key: value for key, value in optional.items() if value not in (None, [], {})})
return payload
def _employee_payload(employee: Employee, include_data: bool = True) -> dict: def _employee_payload(employee: Employee, include_data: bool = True) -> dict:
payload = { payload = {
"profile_key": employee.profile_key, "profile_key": employee.profile_key,

View File

@@ -41,6 +41,7 @@ class Employee(Base):
snapshots: Mapped[list["EmployeeSnapshot"]] = relationship(back_populates="employee") snapshots: Mapped[list["EmployeeSnapshot"]] = relationship(back_populates="employee")
tabs: Mapped[list["ProfileTab"]] = relationship(back_populates="employee", cascade="all, delete-orphan") tabs: Mapped[list["ProfileTab"]] = relationship(back_populates="employee", cascade="all, delete-orphan")
publications: Mapped[list["EmployeePublication"]] = relationship(back_populates="employee", cascade="all, delete-orphan")
crawl_run_changes: Mapped[list["CrawlRunEmployeeChange"]] = relationship(back_populates="employee") crawl_run_changes: Mapped[list["CrawlRunEmployeeChange"]] = relationship(back_populates="employee")
@@ -60,6 +61,42 @@ class EmployeeSnapshot(Base):
employee: Mapped[Employee] = relationship(back_populates="snapshots") employee: Mapped[Employee] = relationship(back_populates="snapshots")
class EmployeePublication(Base):
__tablename__ = "employee_publications"
__table_args__ = (
UniqueConstraint("employee_id", "publication_id", name="uq_employee_publications_employee_publication"),
UniqueConstraint("employee_id", "source_hash", name="uq_employee_publications_employee_source_hash"),
Index("ix_employee_publications_employee_id", "employee_id"),
Index("ix_employee_publications_publication_id", "publication_id"),
Index("ix_employee_publications_doi_url", "doi_url"),
Index("ix_employee_publications_year", "year"),
Index("ix_employee_publications_publication_type", "publication_type"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
employee_id: Mapped[int] = mapped_column(ForeignKey("employees.id", ondelete="CASCADE"), nullable=False)
publication_id: Mapped[str | None] = mapped_column(String(64))
title: Mapped[str] = mapped_column(Text, nullable=False)
year: Mapped[int | None] = mapped_column(Integer)
publication_type: Mapped[str | None] = mapped_column(String(64))
language: Mapped[str | None] = mapped_column(String(16))
status: Mapped[int | None] = mapped_column(Integer)
url: Mapped[str | None] = mapped_column(Text)
doi_url: Mapped[str | None] = mapped_column(Text)
other_url: Mapped[str | None] = mapped_column(Text)
document_url: Mapped[str | None] = mapped_column(Text)
citation_text: Mapped[str | None] = mapped_column(Text)
annotation: Mapped[dict | None] = mapped_column(json_type)
description: Mapped[dict | None] = mapped_column(json_type)
authors: Mapped[list | None] = mapped_column(json_type)
raw_data: Mapped[dict | None] = mapped_column(json_type)
source_hash: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow, nullable=False)
employee: Mapped[Employee] = relationship(back_populates="publications")
class CrawlRun(Base): class CrawlRun(Base):
__tablename__ = "crawl_runs" __tablename__ = "crawl_runs"

View File

@@ -333,7 +333,7 @@ def _load_widget_publications(
items = _extract_publication_items(result) items = _extract_publication_items(result)
if not items: if not items:
break break
publications.extend(_normalize_publication_item(item) for item in items) publications.extend(_normalize_publication_item(item, author_id) for item in items)
total = int(result.get("total") or 0) total = int(result.get("total") or 0)
if not result.get("more") and len(publications) >= total: if not result.get("more") and len(publications) >= total:
@@ -575,20 +575,37 @@ def _parse_vkr_items(nodes: list) -> list[str]:
return [item for item in dict.fromkeys(items) if item] return [item for item in dict.fromkeys(items) if item]
def _normalize_publication_item(item: dict) -> dict: def _normalize_publication_item(item: dict, current_author_id: str | None = None) -> dict:
publication_id = str(item.get("id") or "").strip() publication_id = str(item.get("id") or "").strip()
title = _html_to_text(item.get("title")) title = _html_to_text(item.get("title"))
year = item.get("year") year = _int_or_none(item.get("year"))
publication_type = str(item.get("type") or "").strip() or None publication_type = str(item.get("type") or "").strip() or None
description = item.get("description") if isinstance(item.get("description"), dict) else {} description = item.get("description") if isinstance(item.get("description"), dict) else {}
short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft")) short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft"))
documents = item.get("documents") if isinstance(item.get("documents"), dict) else {}
language = item.get("language") if isinstance(item.get("language"), dict) else {}
annotation = _localized_text_map(item.get("annotation"))
authors = _normalize_publication_authors(item.get("authorsByType"), current_author_id)
citation_text = normalize_ws(str(description.get("main") or "")) or _build_publication_citation(title, authors, year)
text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part)) text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part))
return { return {
"id": publication_id or None, "id": publication_id or None,
"publication_id": publication_id or None,
"title": title or publication_id, "title": title or publication_id,
"year": year, "year": year,
"type": publication_type, "type": publication_type,
"publication_type": publication_type,
"language": normalize_ws(language.get("name")) or None,
"status": _int_or_none(item.get("status")),
"url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None, "url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None,
"doi_url": _document_href(documents, "DOI"),
"other_url": _document_href(documents, "OTHER_URL"),
"document_url": _document_href(documents, "DOCUMENT"),
"citation_text": citation_text or None,
"annotation": annotation,
"description": description or None,
"authors": authors,
"raw_data": item,
"text": text or title or publication_id, "text": text or title or publication_id,
} }
@@ -685,12 +702,69 @@ def _html_to_text(value: object) -> str:
return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True)) return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True))
def _localized_text_map(value: object) -> dict[str, str]:
if not isinstance(value, dict):
return {}
localized = {}
for key in ("ru", "en", "publ"):
text = _html_to_text(value.get(key))
if text:
localized[key] = text
return localized
def _localized_value(value: object) -> str: def _localized_value(value: object) -> str:
if isinstance(value, dict): if isinstance(value, dict):
return normalize_ws(value.get("ru") or value.get("publ") or value.get("en")) return normalize_ws(value.get("ru") or value.get("publ") or value.get("en"))
return normalize_ws(str(value or "")) return normalize_ws(str(value or ""))
def _normalize_publication_authors(value: object, current_author_id: str | None) -> list[dict]:
if not isinstance(value, dict):
return []
authors = []
for author in value.get("author") or []:
if not isinstance(author, dict):
continue
title = author.get("title") if isinstance(author.get("title"), dict) else {}
reverse_title = author.get("reverseTitle") if isinstance(author.get("reverseTitle"), dict) else {}
author_id = normalize_ws(author.get("id"))
href = normalize_ws(author.get("href"))
authors.append(
{
"id": author_id or None,
"href": urljoin("https://www.hse.ru", href) if href else None,
"title_ru": _html_to_text(title.get("ru")),
"title_en": _html_to_text(title.get("en")),
"reverse_title_ru": _html_to_text(reverse_title.get("ru")),
"reverse_title_en": _html_to_text(reverse_title.get("en")),
"alt_name": normalize_ws(author.get("altName")) or None,
"other_name": normalize_ws(author.get("otherName")) or None,
"is_current_employee": bool(current_author_id and author_id == current_author_id),
}
)
return authors
def _document_href(documents: dict, key: str) -> str | None:
document = documents.get(key)
if not isinstance(document, dict):
return None
return normalize_ws(document.get("href")) or None
def _build_publication_citation(title: str, authors: list[dict], year: int | None) -> str:
author_names = [author.get("title_ru") or author.get("title_en") or author.get("alt_name") for author in authors]
return normalize_ws(". ".join(part for part in [", ".join(filter(None, author_names)), title, str(year or "")] if part))
def _int_or_none(value: object) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _slugify(value: str) -> str: def _slugify(value: str) -> str:
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE) cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section" return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"

View File

@@ -6,11 +6,20 @@ import time
from datetime import datetime, timezone from datetime import datetime, timezone
import requests import requests
from sqlalchemy import select from sqlalchemy import inspect, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.config import Settings from app.config import Settings
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab from app.models import (
CrawlError,
CrawlRun,
CrawlRunEmployeeChange,
Employee,
EmployeePublication,
EmployeeSnapshot,
ParserSource,
ProfileTab,
)
from app.parser.collector import collect_profile_links from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key from app.parser.profile_url import profile_key
@@ -219,9 +228,127 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee
parser_version=parser_version, parser_version=parser_version,
) )
) )
db.flush()
_try_sync_employee_publications(db, run, employee, parsed)
return employee, changed return employee, changed
def _try_sync_employee_publications(db: Session, run: CrawlRun, employee: Employee, parsed: dict) -> None:
try:
if not _publication_payloads(parsed):
return
if not _employee_publications_table_exists(db):
return
with db.begin_nested():
_sync_employee_publications(db, employee, parsed)
except Exception as exc:
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=employee.canonical_url,
error_type=type(exc).__name__,
message=f"Не удалось сохранить публикации сотрудника: {exc}",
)
)
def _employee_publications_table_exists(db: Session) -> bool:
return inspect(db.connection()).has_table(EmployeePublication.__tablename__)
def _sync_employee_publications(db: Session, employee: Employee, parsed: dict) -> None:
publications = _publication_payloads(parsed)
seen_hashes = set()
for publication in publications:
source_hash = _publication_hash(publication)
seen_hashes.add(source_hash)
publication_id = _clean_optional(publication.get("publication_id") or publication.get("id"))
existing = None
if publication_id:
existing = db.scalar(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.publication_id == publication_id,
)
)
if not existing:
existing = db.scalar(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.source_hash == source_hash,
)
)
if not existing:
existing = EmployeePublication(employee_id=employee.id, source_hash=source_hash, title=_publication_title(publication))
db.add(existing)
_apply_publication(existing, publication, source_hash)
if seen_hashes:
stale = db.scalars(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.source_hash.not_in(seen_hashes),
)
).all()
for item in stale:
db.delete(item)
def _publication_payloads(parsed: dict) -> list[dict]:
publications = []
for section in parsed.get("sections") or []:
if not isinstance(section, dict) or section.get("type") != "publications":
continue
for publication in section.get("publications") or []:
if isinstance(publication, dict):
publications.append(publication)
return publications
def _apply_publication(target: EmployeePublication, publication: dict, source_hash: str) -> None:
target.publication_id = _clean_optional(publication.get("publication_id") or publication.get("id"))
target.title = _publication_title(publication)
target.year = _int_or_none(publication.get("year"))
target.publication_type = _clean_optional(publication.get("publication_type") or publication.get("type"))
target.language = _clean_optional(publication.get("language"))
target.status = _int_or_none(publication.get("status"))
target.url = _clean_optional(publication.get("url"))
target.doi_url = _clean_optional(publication.get("doi_url"))
target.other_url = _clean_optional(publication.get("other_url"))
target.document_url = _clean_optional(publication.get("document_url"))
target.citation_text = _clean_optional(publication.get("citation_text") or publication.get("text"))
target.annotation = publication.get("annotation") if isinstance(publication.get("annotation"), dict) else None
target.description = publication.get("description") if isinstance(publication.get("description"), dict) else None
target.authors = publication.get("authors") if isinstance(publication.get("authors"), list) else None
target.raw_data = publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication
target.source_hash = source_hash
def _publication_hash(publication: dict) -> str:
return _payload_hash(publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication)
def _payload_hash(value: object) -> str:
payload = json.dumps(_stable_checksum_payload(value), ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _publication_title(publication: dict) -> str:
return _clean_optional(publication.get("title") or publication.get("text") or publication.get("id")) or "Untitled publication"
def _clean_optional(value: object) -> str | None:
text = str(value or "").strip()
return text or None
def _int_or_none(value: object) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int: def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
dismissed = 0 dismissed = 0
active = db.scalars(select(Employee).where(Employee.status == "active")).all() active = db.scalars(select(Employee).where(Employee.status == "active")).all()

View File

@@ -1,3 +1,3 @@
APP_VERSION = "0.6.1" APP_VERSION = "0.6.2"
FRONTEND_VERSION = "0.6.1" FRONTEND_VERSION = "0.6.2"
BACKEND_VERSION = "0.6.1" BACKEND_VERSION = "0.6.2"

View File

@@ -0,0 +1,39 @@
CREATE TABLE IF NOT EXISTS employee_publications (
id SERIAL PRIMARY KEY,
employee_id INTEGER NOT NULL REFERENCES employees(id) ON DELETE CASCADE,
publication_id VARCHAR(64),
title TEXT NOT NULL,
year INTEGER,
publication_type VARCHAR(64),
language VARCHAR(16),
status INTEGER,
url TEXT,
doi_url TEXT,
other_url TEXT,
document_url TEXT,
citation_text TEXT,
annotation JSONB,
description JSONB,
authors JSONB,
raw_data JSONB,
source_hash VARCHAR(64) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_employee_publications_employee_publication UNIQUE (employee_id, publication_id),
CONSTRAINT uq_employee_publications_employee_source_hash UNIQUE (employee_id, source_hash)
);
CREATE INDEX IF NOT EXISTS ix_employee_publications_employee_id
ON employee_publications (employee_id);
CREATE INDEX IF NOT EXISTS ix_employee_publications_publication_id
ON employee_publications (publication_id);
CREATE INDEX IF NOT EXISTS ix_employee_publications_doi_url
ON employee_publications (doi_url);
CREATE INDEX IF NOT EXISTS ix_employee_publications_year
ON employee_publications (year);
CREATE INDEX IF NOT EXISTS ix_employee_publications_publication_type
ON employee_publications (publication_type);

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "miem-workers" name = "miem-workers"
version = "0.6.1" version = "0.6.2"
description = "MIEM employees parser, admin API, and MCP server" description = "MIEM employees parser, admin API, and MCP server"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [

View File

@@ -10,7 +10,7 @@ from sqlalchemy.pool import StaticPool
from app.config import Settings, get_settings from app.config import Settings, get_settings
from app.db import Base, get_db from app.db import Base, get_db
from app.main import app from app.main import app
from app.models import CrawlRun, CrawlRunEmployeeChange, Employee from app.models import CrawlRun, CrawlRunEmployeeChange, Employee, EmployeePublication
from app.security import SESSION_COOKIE, sign_session from app.security import SESSION_COOKIE, sign_session
@@ -20,7 +20,7 @@ def test_health_returns_versions():
response = client.get("/api/health") response = client.get("/api/health")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["backend_version"] == "0.6.1" assert response.json()["backend_version"] == "0.6.2"
def test_mcp_lists_tools_without_auth_and_ignores_auth_header(): def test_mcp_lists_tools_without_auth_and_ignores_auth_header():
@@ -154,13 +154,115 @@ def test_mcp_service_info_returns_tools_and_dataset_hash():
assert response.status_code == 200 assert response.status_code == 200
payload = json.loads(response.json()["result"]["content"][0]["text"]) payload = json.loads(response.json()["result"]["content"][0]["text"])
assert payload["service_name"] == "miem-employees" assert payload["service_name"] == "miem-employees"
assert payload["backend_version"] == "0.6.1" assert payload["backend_version"] == "0.6.2"
assert payload["dataset"]["hash"] assert payload["dataset"]["hash"]
assert any(tool["name"] == "sync_employees" for tool in payload["tools"]) assert any(tool["name"] == "sync_employees" for tool in payload["tools"])
app.dependency_overrides.clear() app.dependency_overrides.clear()
def test_mcp_list_employee_publications_prefers_stored_publications_with_fallback():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
stored_employee = Employee(
profile_key="staff:stored",
profile_type="staff",
profile_id="stored",
canonical_url="https://www.hse.ru/staff/stored",
full_name="Stored Person",
status="active",
current_data={
"sections": [
{
"type": "publications",
"publications": [{"title": "Old JSON Publication", "url": "https://example.test/old"}],
}
]
},
)
fallback_employee = Employee(
profile_key="staff:fallback",
profile_type="staff",
profile_id="fallback",
canonical_url="https://www.hse.ru/staff/fallback",
full_name="Fallback Person",
status="active",
current_data={
"sections": [
{
"type": "publications",
"publications": [{"title": "Fallback Publication", "url": "https://example.test/fallback"}],
}
]
},
)
session.add_all([stored_employee, fallback_employee])
session.commit()
session.add(
EmployeePublication(
employee_id=stored_employee.id,
publication_id="pub-1",
title="Stored Publication",
year=2024,
publication_type="ARTICLE",
url="https://publications.hse.ru/view/pub-1",
doi_url="https://doi.org/10.1/test",
citation_text="Stored Citation",
annotation={"ru": "Аннотация", "en": "Abstract"},
description={"main": "Stored Citation"},
authors=[{"id": "1", "title_ru": "Автор", "is_current_employee": True}],
source_hash="a" * 64,
)
)
session.commit()
session.close()
def override_db():
db = Session()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
stored_response = client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "list_employee_publications", "arguments": {"profile_id_or_url": "stored"}},
},
)
fallback_response = client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": "list_employee_publications", "arguments": {"profile_id_or_url": "fallback"}},
},
)
stored_payload = json.loads(stored_response.json()["result"]["content"][0]["text"])
fallback_payload = json.loads(fallback_response.json()["result"]["content"][0]["text"])
assert stored_payload["items"][0]["title"] == "Stored Publication"
assert stored_payload["items"][0]["doi_url"] == "https://doi.org/10.1/test"
assert stored_payload["items"][0]["annotation"] == {"ru": "Аннотация", "en": "Abstract"}
assert stored_payload["items"][0]["authors"] == [{"id": "1", "title_ru": "Автор", "is_current_employee": True}]
assert fallback_payload["items"][0]["title"] == "Fallback Publication"
app.dependency_overrides.clear()
def test_mcp_sync_employees_full_empty_and_unknown_hash_modes(): def test_mcp_sync_employees_full_empty_and_unknown_hash_modes():
engine = create_engine( engine = create_engine(
"sqlite:///:memory:", "sqlite:///:memory:",

View File

@@ -1,7 +1,7 @@
import gzip import gzip
from datetime import datetime, timezone from datetime import datetime, timezone
from app.models import CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParseResourceCache from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeePublication, EmployeeSnapshot, ParseResourceCache
from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee
from app.services.resource_cache import ResourceCache from app.services.resource_cache import ResourceCache
@@ -191,6 +191,68 @@ def test_upsert_employee_skips_snapshot_when_checksum_is_unchanged(db_session):
assert db_session.query(EmployeeSnapshot).count() == 1 assert db_session.query(EmployeeSnapshot).count() == 1
def test_upsert_employee_saves_publications_and_reuses_existing_rows(db_session):
first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add_all([first_run, second_run])
db_session.commit()
parsed = _parsed_employee("published")
parsed["sections"] = [
{
"type": "publications",
"publications": [
{
"id": "888959076",
"publication_id": "888959076",
"title": "Detailed Publication",
"year": 2023,
"publication_type": "ARTICLE",
"language": "ru",
"status": 1,
"url": "https://publications.hse.ru/view/888959076",
"doi_url": "https://doi.org/10.1/test",
"citation_text": "Detailed citation",
"annotation": {"ru": "Аннотация"},
"description": {"main": "Detailed citation"},
"authors": [{"id": "1", "title_ru": "Автор"}],
"raw_data": {"id": "888959076", "title": "Detailed Publication"},
}
],
}
]
employee, _ = _upsert_employee(db_session, first_run, parsed)
db_session.commit()
_upsert_employee(db_session, second_run, _parsed_employee_with_publication("published"))
db_session.commit()
publications = db_session.query(EmployeePublication).filter_by(employee_id=employee.id).all()
assert len(publications) == 1
assert publications[0].doi_url == "https://doi.org/10.1/test"
assert publications[0].authors == [{"id": "1", "title_ru": "Автор"}]
def test_upsert_employee_records_publication_errors_without_failing_employee(monkeypatch, db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
db_session.commit()
def broken_sync(*_args, **_kwargs):
raise RuntimeError("boom")
monkeypatch.setattr("app.services.crawler._sync_employee_publications", broken_sync)
employee, changed = _upsert_employee(db_session, run, _parsed_employee_with_publication("error-safe"))
db_session.commit()
assert changed is True
assert employee.full_name == "Same Person"
assert db_session.query(Employee).filter_by(profile_key="staff:error-safe").one()
error = db_session.query(CrawlError).one()
assert "публикации" in error.message.lower()
def test_checksum_changes_when_widget_data_changes(): def test_checksum_changes_when_widget_data_changes():
base = _parsed_employee("widgets") base = _parsed_employee("widgets")
changed = _parsed_employee("widgets") changed = _parsed_employee("widgets")
@@ -224,3 +286,31 @@ def _parsed_employee(profile_id: str) -> dict:
"parser_version": "0.6.0", "parser_version": "0.6.0",
"_html": "<html></html>", "_html": "<html></html>",
} }
def _parsed_employee_with_publication(profile_id: str) -> dict:
parsed = _parsed_employee(profile_id)
parsed["sections"] = [
{
"type": "publications",
"publications": [
{
"id": "888959076",
"publication_id": "888959076",
"title": "Detailed Publication",
"year": 2023,
"publication_type": "ARTICLE",
"language": "ru",
"status": 1,
"url": "https://publications.hse.ru/view/888959076",
"doi_url": "https://doi.org/10.1/test",
"citation_text": "Detailed citation",
"annotation": {"ru": "Аннотация"},
"description": {"main": "Detailed citation"},
"authors": [{"id": "1", "title_ru": "Автор"}],
"raw_data": {"id": "888959076", "title": "Detailed Publication"},
}
],
}
]
return parsed

View File

@@ -25,3 +25,47 @@ def test_runtime_schema_adds_skipped_count_to_existing_crawl_runs_table(monkeypa
columns = {column["name"] for column in inspect(engine).get_columns("crawl_runs")} columns = {column["name"] for column in inspect(engine).get_columns("crawl_runs")}
assert "skipped_count" in columns assert "skipped_count" in columns
def test_runtime_schema_creates_employee_publications_table_when_employees_exist(monkeypatch):
engine = create_engine("sqlite:///:memory:")
with engine.begin() as connection:
connection.execute(
text(
"""
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
profile_key VARCHAR(255) NOT NULL UNIQUE,
canonical_url TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'active',
first_seen_at DATETIME NOT NULL,
last_seen_at DATETIME NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
)
"""
)
)
connection.execute(
text(
"""
CREATE TABLE crawl_runs (
id INTEGER PRIMARY KEY,
source_url TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'running',
found_count INTEGER NOT NULL DEFAULT 0,
parsed_count INTEGER NOT NULL DEFAULT 0,
skipped_count INTEGER NOT NULL DEFAULT 0
)
"""
)
)
monkeypatch.setattr("app.db.engine", engine)
_ensure_runtime_schema()
_ensure_runtime_schema()
inspector = inspect(engine)
assert "employee_publications" in inspector.get_table_names()
columns = {column["name"] for column in inspector.get_columns("employee_publications")}
assert {"employee_id", "publication_id", "doi_url", "authors", "raw_data", "source_hash"}.issubset(columns)

View File

@@ -34,7 +34,21 @@ class FakeSession:
"type": "ARTICLE", "type": "ARTICLE",
"title": "Дублирование пакетов", "title": "Дублирование пакетов",
"year": 2023, "year": 2023,
"language": {"name": "ru"},
"status": 1,
"authorsByType": {
"author": [
{
"id": "568398853",
"href": "/org/persons/568398853",
"title": {"ru": "Левицкий И. А.", "en": ""},
"reverseTitle": {"ru": "И. А. Левицкий", "en": ""},
}
]
},
"description": {"short": {"ru": "Информационные процессы. 2023."}}, "description": {"short": {"ru": "Информационные процессы. 2023."}},
"annotation": {"ru": "<p>Русская аннотация</p>"},
"documents": {"DOI": {"href": "https://doi.org/10.1/test"}},
} }
], ],
}, },
@@ -153,6 +167,9 @@ def test_enrich_sections_from_hse_widgets_loads_publications_and_vkr():
assert publications["publications_count"] == 1 assert publications["publications_count"] == 1
assert publications["publications"][0]["url"] == "https://publications.hse.ru/view/888959076" assert publications["publications"][0]["url"] == "https://publications.hse.ru/view/888959076"
assert publications["publications"][0]["doi_url"] == "https://doi.org/10.1/test"
assert publications["publications"][0]["annotation"] == {"ru": "Русская аннотация"}
assert publications["publications"][0]["authors"][0]["is_current_employee"] is True
assert theses["theses_count"] == 1 assert theses["theses_count"] == 1
assert theses["theses"][0]["student"] == "Лесняк Владислав Евгеньевич" assert theses["theses"][0]["student"] == "Лесняк Владислав Евгеньевич"
assert theses["theses"][0]["project_url"] == "https://www.hse.ru/edu/vkr/1045750164" assert theses["theses"][0]["project_url"] == "https://www.hse.ru/edu/vkr/1045750164"