Compare commits

...

13 Commits

Author SHA1 Message Date
Anton
6724b3f369 feat: adds crawl resource cache 2026-05-14 12:21:44 +03:00
5180b89b81 Merge pull request 'feat: add dataset checkpoint sync for MCP' (#23) from feature/dataset-version-sync into main
Reviewed-on: #23
2026-05-14 08:01:26 +00:00
Anton
29451ccee1 feat: add dataset checkpoint sync for MCP 2026-05-14 11:00:46 +03:00
a3ff9c6e9c Merge pull request 'fix: separate news from publications and add employee refresh' (#22) from fix/publications-news-refresh into main
Reviewed-on: #22
2026-05-13 13:12:06 +00:00
Anton
8e19dc9f35 fix: separate news from publications and add employee refresh 2026-05-13 16:11:13 +03:00
5b9d71426d Merge pull request 'fix: support grouped HSE publication API responses' (#21) from fix/grouped-publications-parser into main
Reviewed-on: #21
2026-05-13 09:46:48 +00:00
Anton
efa7192e45 fix: support grouped HSE publication API responses 2026-05-13 12:46:07 +03:00
b27d613143 Merge pull request 'fix: remove mcp-auth from yml-file' (#20) from fix/remove-mcp-auth-compose into main
Reviewed-on: #20
2026-05-08 09:33:17 +00:00
Anton
a1ab1c0319 fix: remove mcp-auth from yml-file 2026-05-08 12:32:40 +03:00
0b4e04544d Merge pull request 'fix: remove MCP application-level authorization' (#19) from fix/remove-mcp-auth into main
Reviewed-on: #19
2026-05-08 09:15:18 +00:00
Anton
7593a460c7 fix: remove MCP application-level authorization 2026-05-08 12:14:19 +03:00
a4e7388bcf Merge pull request 'fix: use direct onclick handlers for run rows' (#18) from fix/direct-run-row-click-handler into main
Reviewed-on: #18
2026-05-07 15:25:26 +00:00
Anton
ac319b3ee5 fix: use direct onclick handlers for run rows 2026-05-07 18:23:14 +03:00
36 changed files with 2057 additions and 462 deletions

View File

@@ -14,13 +14,5 @@ PARSER_USE_PLAYWRIGHT=false
ADMIN_USERNAME=admin
ADMIN_PASSWORD=change-me
SESSION_SECRET=change-me-session-secret
MCP_TOKEN=change-me-mcp-token
MCP_AUTH_MODE=oauth
MCP_RESOURCE_URL=http://localhost:8001/mcp
MCP_OAUTH_ISSUER=
MCP_OAUTH_AUDIENCE=
MCP_OAUTH_JWKS_URL=
MCP_OAUTH_REQUIRED_SCOPE=mcp:tools
API_PORT=8000
MCP_PORT=8001

592
MCP_DESCRIPTION.md Normal file
View File

@@ -0,0 +1,592 @@
# MCP: описание работы, структуры и тулзов
Документ описывает MCP endpoint сервиса `miem-employees` по текущей реализации в `app/mcp.py`.
## Где находится MCP
- FastAPI router: `app.mcp.router`
- Подключение к приложению: `app/main.py`
- HTTP endpoint: `POST /mcp`
- Локально при обычном запуске API: `http://localhost:8000/mcp`
- В Docker Compose через отдельный сервис `mcp`: `http://localhost:8001/mcp`
- Авторизация на уровне приложения: отсутствует. Заголовок `Authorization` не проверяется и не влияет на ответ.
Если доступ к MCP нужно ограничить, это должно делаться внешним контуром: bind на localhost, VPN, firewall, reverse proxy или отдельная сетевая политика.
## Протокол
Endpoint принимает JSON-RPC 2.0 over HTTP.
Общий формат запроса:
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
```
Общий формат успешного ответа:
```json
{
"jsonrpc": "2.0",
"id": 1,
"result": {}
}
```
Общий формат ошибки:
```json
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32601,
"message": "Method not found"
}
}
```
Поддерживаемая версия MCP-протокола:
```text
2024-11-05
```
Имя сервиса:
```text
miem-employees
```
Версия сервера берется из `app.version.BACKEND_VERSION`.
## Поддерживаемые JSON-RPC методы
### initialize
Возвращает метаданные MCP-сервера и capabilities.
Запрос:
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
}
```
Ответ:
```json
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "miem-employees",
"version": "0.5.0"
},
"capabilities": {
"tools": {}
}
}
}
```
### tools/list
Возвращает список доступных tools с JSON Schema для аргументов.
Запрос:
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
```
Ответ содержит массив `result.tools`.
### tools/call
Вызывает один tool по имени.
Запрос:
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "search_employees",
"arguments": {
"query": "Сергеев",
"limit": 20
}
}
}
```
Ответ tool всегда заворачивается в MCP content-массив:
```json
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "{\"items\":[]}"
}
]
}
}
```
Поле `text` содержит сериализованный JSON с `ensure_ascii=false`. Клиент должен распарсить это поле как JSON, если ему нужна структурированная нагрузка.
## Ошибки
- Неизвестный JSON-RPC метод: `code = -32601`, `message = "Method not found"`.
- Исключения при обработке tool: `code = -32000`, `message` содержит текст исключения.
- Если сущность не найдена внутри отдельных tools, HTTP и JSON-RPC ответ остаются успешными, а полезная нагрузка содержит `{"error": "not_found"}`.
## Источники данных
MCP читает данные из основной базы через SQLAlchemy session из `app.db.get_db`.
Основные таблицы и модели:
- `employees`: текущая карточка сотрудника, статус, профиль, `current_data`, checksum.
- `crawl_runs`: история запусков парсинга.
- `crawl_run_employee_changes`: детальные изменения сотрудников в рамках запуска.
- `crawl_errors`: ошибки парсинга в рамках запуска.
- `dataset_versions`: версии полного набора сотрудников.
- `dataset_version_items`: состав конкретной версии набора сотрудников.
## Общая структура employee payload
Краткая карточка сотрудника:
```json
{
"profile_key": "staff:avsergeev",
"profile_id": "avsergeev",
"full_name": "Сергеев Алексей Викторович",
"status": "active",
"canonical_url": "https://www.hse.ru/staff/avsergeev",
"last_seen_at": "2026-05-14T10:00:00+00:00",
"dismissed_at": null
}
```
В sync payload дополнительно отдается `checksum`.
Полная карточка дополнительно содержит:
```json
{
"data": {
"contacts": {},
"sections": []
}
}
```
`data` соответствует распарсенному JSON профиля сотрудника. Внутри `sections` могут быть секции с публикациями, курсами, ВКР, таблицами, ссылками и произвольными текстовыми блоками.
## Tools
### get_service_info
Назначение: вернуть метаданные сервиса, список tools и текущую версию набора сотрудников.
Аргументы: отсутствуют.
Возвращает:
```json
{
"service_name": "miem-employees",
"backend_version": "0.5.0",
"protocolVersion": "2024-11-05",
"tools": [],
"dataset": {
"hash": "sha256",
"previous_hash": "sha256 или null",
"created_at": "2026-05-14T10:00:00+00:00",
"crawl_run_id": 123,
"employee_count": 100,
"active_count": 95,
"dismissed_count": 5
}
}
```
Особенность: перед ответом сервис создает актуальную `dataset_version`, если текущий набор сотрудников еще не имеет версии.
### sync_employees
Назначение: синхронизировать клиентский кэш сотрудников по hash набора данных.
Аргументы:
```json
{
"client_hash": "sha256 или null",
"include_data": true
}
```
- `client_hash`: hash версии, которая уже есть у клиента. Если не передан, отдается полный snapshot.
- `include_data`: управляет включением полного `data` в карточки сотрудников. По умолчанию `true`.
Полный ответ без `client_hash`:
```json
{
"mode": "full",
"from_hash": null,
"to_hash": "current-sha256",
"dataset": {},
"items": []
}
```
Если клиентский hash совпадает с текущим:
```json
{
"mode": "delta",
"from_hash": "current-sha256",
"to_hash": "current-sha256",
"dataset": {},
"changes": {
"added": [],
"updated": [],
"dismissed": [],
"removed": []
}
}
```
Если `client_hash` неизвестен серверу:
```json
{
"mode": "full",
"from_hash": "missing",
"to_hash": "current-sha256",
"dataset": {},
"items": [],
"reason": "unknown_client_hash"
}
```
Если `client_hash` найден и отличается от текущего:
```json
{
"mode": "delta",
"from_hash": "old-sha256",
"to_hash": "current-sha256",
"dataset": {},
"changes": {
"added": [],
"updated": [],
"dismissed": [],
"removed": []
}
}
```
Логика delta:
- `added`: сотрудник появился в новой версии.
- `updated`: изменился checksum или статус, и сотрудник активен.
- `dismissed`: сотрудник есть в новой версии, но получил статус `dismissed`.
- `removed`: `profile_key` был в старой версии, но отсутствует в новой.
Hash набора считается по отсортированному списку `{profile_key, status, checksum}`.
### search_employees
Назначение: найти сотрудников по ФИО или canonical URL.
Аргументы:
```json
{
"query": "Сергеев",
"status": "active",
"limit": 20
}
```
- `query`: обязательный по schema, но в коде пустая строка означает поиск без текстового фильтра.
- `status`: опционально, только `active` или `dismissed`.
- `limit`: максимум 100, по умолчанию 20.
Возвращает массив кратких employee payload без `data`:
```json
[
{
"profile_key": "staff:avsergeev",
"profile_id": "avsergeev",
"full_name": "Сергеев Алексей Викторович",
"status": "active",
"canonical_url": "https://www.hse.ru/staff/avsergeev",
"last_seen_at": "2026-05-14T10:00:00+00:00",
"dismissed_at": null
}
]
```
### get_employee
Назначение: получить одну карточку сотрудника.
Аргументы:
```json
{
"profile_id_or_url": "avsergeev"
}
```
Поиск выполняется по:
- `profile_key`
- `profile_id`
- точному `canonical_url`
- частичному совпадению `canonical_url`
Возвращает полный employee payload с `data`.
Если сотрудник не найден:
```json
{
"error": "not_found"
}
```
### list_employee_publications
Назначение: вернуть публикации сотрудника из распарсенных секций профиля.
Аргументы:
```json
{
"profile_id_or_url": "avsergeev"
}
```
Сервис ищет секции `current_data.sections` с `type = "publications"` и объединяет массивы `publications`.
Ответ:
```json
{
"employee": {},
"items": [
{
"title": "Название публикации",
"text": "Полное описание",
"url": "https://..."
}
]
}
```
Если сотрудник или данные профиля отсутствуют:
```json
{
"items": []
}
```
### list_employee_courses
Назначение: вернуть курсы преподавания сотрудника из распарсенных секций профиля.
Аргументы:
```json
{
"profile_id_or_url": "avsergeev"
}
```
Сервис ищет секции `current_data.sections` с `type = "courses_by_year"` и объединяет массивы `courses`.
Ответ:
```json
{
"employee": {},
"items": [
{
"title": "Название курса",
"url": "https://..."
}
]
}
```
Если сотрудник или данные профиля отсутствуют:
```json
{
"items": []
}
```
### get_crawl_status
Назначение: вернуть последний запуск парсинга.
Аргументы: отсутствуют.
Ответ:
```json
{
"id": 123,
"status": "completed",
"source_url": "https://miem.hse.ru/persons",
"started_at": "2026-05-14T10:00:00+00:00",
"finished_at": "2026-05-14T10:10:00+00:00",
"found_count": 100,
"parsed_count": 98,
"error_count": 2,
"dismissed_count": 1
}
```
Если запусков еще не было:
```json
{
"status": "never_run"
}
```
### get_crawl_run_details
Назначение: вернуть детальную информацию по конкретному запуску парсинга: summary, изменения сотрудников и ошибки.
Аргументы:
```json
{
"run_id": 123
}
```
Ответ:
```json
{
"id": 123,
"source_url": "https://miem.hse.ru/persons",
"status": "completed",
"status_display": "Завершен",
"started_at": "2026-05-14T10:00:00+00:00",
"finished_at": "2026-05-14T10:10:00+00:00",
"started_display": "14.05.2026 13:00",
"finished_display": "14.05.2026 13:10",
"found_count": 100,
"parsed_count": 98,
"new_count": 3,
"error_count": 2,
"dismissed_count": 1,
"processed_count": 100,
"progress_percent": 100.0,
"message": null,
"changes_detail_available": true,
"changes": {
"new": [],
"missing_from_source": [],
"dismissed": []
},
"errors": []
}
```
Если запуск не найден:
```json
{
"error": "not_found"
}
```
## Примеры curl
Список tools:
```bash
curl http://localhost:8001/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}'
```
Поиск сотрудника:
```bash
curl http://localhost:8001/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"search_employees","arguments":{"query":"Сергеев","limit":5}}}'
```
Полная синхронизация:
```bash
curl http://localhost:8001/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"sync_employees","arguments":{"include_data":false}}}'
```
Delta-синхронизация:
```bash
curl http://localhost:8001/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"sync_employees","arguments":{"client_hash":"known-sha256","include_data":true}}}'
```
## Как MCP используется клиентом
1. Клиент вызывает `initialize` и проверяет `protocolVersion`.
2. Клиент вызывает `tools/list`, чтобы получить актуальный список tools и input schemas.
3. Для поиска и точечных запросов клиент вызывает `tools/call` с `search_employees`, `get_employee`, `list_employee_publications`, `list_employee_courses`, `get_crawl_status` или `get_crawl_run_details`.
4. Для локального кэша клиент вызывает `get_service_info` или `sync_employees`.
5. Клиент хранит последний `dataset.hash`.
6. При следующей синхронизации клиент передает hash как `client_hash`.
7. Сервер возвращает пустую delta, delta с изменениями или полный snapshot, если hash неизвестен.
## Важные особенности реализации
- MCP endpoint read-only: tools не запускают парсинг и не меняют сотрудников напрямую.
- `get_service_info` и `sync_employees` могут создать новую запись `dataset_versions`, если состояние сотрудников изменилось и новой версии еще нет.
- Все tool payloads возвращаются как JSON-строка внутри `content[0].text`.
- `search_employees` ищет через `ilike` по `full_name` и `canonical_url`.
- `get_employee` допускает частичный URL, поэтому строка `133709486` может найти `https://www.hse.ru/org/persons/133709486`.
- Временные значения сериализуются через `isoformat()`, display-поля для админских payload формируются в часовом поясе `Europe/Moscow`.

View File

@@ -6,7 +6,7 @@
- `api`: FastAPI, REST API, HTML-админка, healthcheck.
- `worker`: weekly scheduler, который запускает парсинг по `CRAWL_CRON`.
- `mcp`: HTTP MCP endpoint с OAuth/OIDC access token для внешних агентов или legacy static token для локального режима.
- `mcp`: открытый HTTP MCP endpoint для ИИ-агентов.
- `postgres`: основная БД.
Парсер использует фиксированный источник сотрудников, по умолчанию `https://miem.hse.ru/persons`. Для каждой карточки сохраняются ФИО, должности, год начала работы, контакты, идентификаторы, вкладки профиля, секции, публикации, курсы, ВКР, JSON-снапшот и сжатый HTML-снапшот. Ссылки обходятся только из меню профиля самого сотрудника (`person-menu`), например `#sci`, `#teaching`, `#main`.
@@ -27,13 +27,6 @@ cp .env.example .env
- `CRAWL_LIMIT`: опциональный лимит профилей для тестового запуска.
- `ADMIN_USERNAME`, `ADMIN_PASSWORD`: логин и пароль админки.
- `SESSION_SECRET`: секрет подписи cookie.
- `MCP_TOKEN`: статический bearer token для legacy/local режима `MCP_AUTH_MODE=token`.
- `MCP_AUTH_MODE`: режим авторизации MCP: `oauth` для внешних агентов или `token` для локальной отладки.
- `MCP_RESOURCE_URL`: публичный URL MCP endpoint, например `https://example.com/mcp`.
- `MCP_OAUTH_ISSUER`: issuer внешнего OIDC-провайдера.
- `MCP_OAUTH_AUDIENCE`: ожидаемый `aud` в OAuth access token.
- `MCP_OAUTH_JWKS_URL`: JWKS endpoint; если не задан, используется `<issuer>/.well-known/jwks.json`.
- `MCP_OAUTH_REQUIRED_SCOPE`: scope для доступа к MCP tools, по умолчанию `mcp:tools`.
- `PARSER_USE_PLAYWRIGHT`: включение Playwright-рендера динамических вкладок.
## Локальный запуск
@@ -51,7 +44,6 @@ uvicorn app.main:app --reload
- `Dashboard`: общая статистика, последний добавленный сотрудник, прогресс текущего/последнего парсинга и ручной запуск.
- `Directory`: настраиваемая таблица сотрудников с фильтрами, сортировкой, пагинацией и выбором колонок.
- `Employees`: простая legacy-таблица сотрудников.
- `Runs`: история запусков, ошибки и progress bar.
## Docker Compose
@@ -82,45 +74,37 @@ curl -X POST http://localhost:8000/api/crawl-runs --cookie "miem_admin_session=.
- новые сотрудники добавляются в `employees`;
- количество новых сотрудников за запуск сохраняется в `crawl_runs.new_count`;
- активные сотрудники, исчезнувшие из текущего списка источника, получают статус `dismissed` и `dismissed_at`;
- каждый успешный разбор сохраняет запись в `employee_snapshots`.
- каждый успешный новый или измененный разбор сохраняет запись в `employee_snapshots`;
- неизмененные профили учитываются в `crawl_runs.skipped_count` и не получают новый snapshot.
Во время выполнения парсинга `found_count`, `parsed_count` и `error_count` обновляются в базе. Админка опрашивает `/api/crawl-runs/latest` и показывает прогресс как `parsed_count + error_count / found_count`.
Во время выполнения парсинга `found_count`, `parsed_count`, `skipped_count` и `error_count` обновляются в базе. Админка опрашивает `/api/crawl-runs/latest` и показывает прогресс как `(parsed_count + skipped_count + error_count) / found_count`.
## MCP
Endpoint: `POST /mcp`, авторизация `Authorization: Bearer <token>`.
Для внешних ИИ-агентов используйте `MCP_AUTH_MODE=oauth`. В этом режиме статический `MCP_TOKEN` не принимается: клиент должен передать OAuth/OIDC access token с нужным scope.
Endpoint: `POST /mcp`, без авторизации на уровне приложения.
Поддерживаемые tools:
- `get_service_info()`
- `sync_employees(client_hash?, include_data?)`
- `search_employees(query, status?, limit?)`
- `get_employee(profile_id_or_url)`
- `list_employee_publications(profile_id_or_url)`
- `list_employee_courses(profile_id_or_url)`
- `get_crawl_status()`
- `get_crawl_run_details(run_id)`
Пример локального legacy-режима со статическим токеном:
`get_service_info` возвращает метаданные сервиса, список tools и текущую версию набора сотрудников. `sync_employees` отдает полный snapshot или delta по `client_hash`; checksum набора строится по сотрудникам, их статусам и текущим checksums. Ответы tools возвращаются как JSON-строка внутри MCP `content[0].text`.
Пример локального запроса списка tools:
```bash
curl http://localhost:8001/mcp \
-H "Authorization: Bearer change-me-mcp-token" \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}'
```
Для production OAuth/OIDC настройте внешний authorization server и включите режим `oauth`:
```env
MCP_AUTH_MODE=oauth
MCP_RESOURCE_URL=https://example.com/mcp
MCP_OAUTH_ISSUER=https://auth.example.com
MCP_OAUTH_AUDIENCE=miem-mcp
MCP_OAUTH_JWKS_URL=https://auth.example.com/.well-known/jwks.json
MCP_OAUTH_REQUIRED_SCOPE=mcp:tools
```
MCP server работает как OAuth protected resource: он не выдает токены, а проверяет JWT access token по JWKS, `issuer`, `audience`, сроку действия и scope. Metadata для MCP-клиентов доступна по `GET /.well-known/oauth-protected-resource`.
Если MCP нужно ограничить, делайте это на сетевом уровне: localhost binding, VPN, firewall, reverse proxy или другой внешний контур доступа.
## Обслуживание
@@ -131,4 +115,4 @@ docker compose exec postgres pg_dump -U miem miem_workers > backup.sql
docker compose down
```
Версия сервиса: `0.4.0`. Админка всегда показывает версии backend и frontend в footer.
Версия сервиса: `0.6.0`. Админка всегда показывает версии backend и frontend в footer.

View File

@@ -17,6 +17,7 @@ from app.services.admin_data import (
stats_payload,
)
from app.services.crawl_control import get_running_run, run_crawl_if_idle
from app.services.crawler import refresh_employee
from app.version import BACKEND_VERSION, FRONTEND_VERSION
router = APIRouter(prefix="/admin")
@@ -144,10 +145,31 @@ def employee_detail(
return _render(
request,
"employee_detail.html",
{"employee": employee, "employee_view": employee_detail_payload(employee), "snapshots": snapshots},
{
"employee": employee,
"employee_view": employee_detail_payload(employee),
"snapshots": snapshots,
"refresh_status": request.query_params.get("refresh_status"),
},
)
@router.post("/employees/{employee_id}/refresh")
def refresh_employee_detail(
employee_id: int,
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
employee = db.get(Employee, employee_id)
if not employee:
return RedirectResponse("/admin/directory", status_code=303)
run = refresh_employee(db, employee, settings)
status = "success" if run.status == "completed" else "error"
return RedirectResponse(f"/admin/employees/{employee_id}?refresh_status={status}", status_code=303)
@router.get("/runs", response_class=HTMLResponse)
def runs(request: Request, db: Session = Depends(get_db), settings: Settings = Depends(get_settings)):
require_admin(request, settings)

View File

@@ -1,6 +1,4 @@
from functools import lru_cache
from typing import Literal
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -19,13 +17,6 @@ class Settings(BaseSettings):
admin_username: str = "admin"
admin_password: str = "admin"
session_secret: str = Field(default="dev-session-secret", min_length=8)
mcp_token: str = "dev-mcp-token"
mcp_auth_mode: Literal["token", "oauth"] = "oauth"
mcp_resource_url: str = "http://localhost:8001/mcp"
mcp_oauth_issuer: str = ""
mcp_oauth_audience: str = ""
mcp_oauth_jwks_url: str = ""
mcp_oauth_required_scope: str = "mcp:tools"
@field_validator("crawl_limit", mode="before")
@classmethod
@@ -34,15 +25,6 @@ class Settings(BaseSettings):
return None
return value
def oauth_jwks_url(self) -> str:
if self.mcp_oauth_jwks_url:
return self.mcp_oauth_jwks_url
issuer = self.mcp_oauth_issuer.rstrip("/")
if not issuer:
return ""
return f"{issuer}/.well-known/jwks.json"
@lru_cache
def get_settings() -> Settings:
return Settings()

View File

@@ -4,7 +4,6 @@ from fastapi.staticfiles import StaticFiles
from app.admin import router as admin_router
from app.api import router as api_router
from app.db import init_db
from app.mcp import metadata_router as mcp_metadata_router
from app.mcp import router as mcp_router
from app.version import BACKEND_VERSION
@@ -13,7 +12,6 @@ app.mount("/static", StaticFiles(directory="app/static"), name="static")
app.include_router(api_router)
app.include_router(admin_router)
app.include_router(mcp_router)
app.include_router(mcp_metadata_router)
@app.on_event("startup")

View File

@@ -4,18 +4,34 @@ from fastapi import APIRouter, Depends, Request
from sqlalchemy import desc, or_, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import get_db
from app.models import CrawlRun, Employee
from app.security import mcp_protected_resource_metadata, require_mcp_auth
from app.services.admin_data import run_detail_payload
from app.services.dataset_versions import service_info_payload, sync_employees_payload
from app.version import BACKEND_VERSION
router = APIRouter(prefix="/mcp")
metadata_router = APIRouter()
PROTOCOL_VERSION = "2024-11-05"
SERVICE_NAME = "miem-employees"
TOOLS = [
{
"name": "get_service_info",
"description": "Return service metadata, supported tools, and current dataset version.",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "sync_employees",
"description": "Synchronize employees by dataset hash. Returns a full snapshot or a delta from client_hash.",
"inputSchema": {
"type": "object",
"properties": {
"client_hash": {"type": "string"},
"include_data": {"type": "boolean", "default": True},
},
},
},
{
"name": "search_employees",
"description": "Search MIEM employees by name or profile URL.",
@@ -65,9 +81,7 @@ TOOLS = [
async def mcp_http(
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_mcp_auth(request, settings)
payload = await request.json()
method = payload.get("method")
request_id = payload.get("id")
@@ -76,8 +90,8 @@ async def mcp_http(
try:
if method == "initialize":
result = {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "miem-employees", "version": BACKEND_VERSION},
"protocolVersion": PROTOCOL_VERSION,
"serverInfo": {"name": SERVICE_NAME, "version": BACKEND_VERSION},
"capabilities": {"tools": {}},
}
elif method == "tools/list":
@@ -92,6 +106,24 @@ async def mcp_http(
def _call_tool(db: Session, name: str, arguments: dict) -> dict:
if name == "get_service_info":
return _tool_response(
service_info_payload(
db,
tools=TOOLS,
service_name=SERVICE_NAME,
backend_version=BACKEND_VERSION,
protocol_version=PROTOCOL_VERSION,
)
)
if name == "sync_employees":
return _tool_response(
sync_employees_payload(
db,
client_hash=arguments.get("client_hash"),
include_data=bool(arguments.get("include_data", True)),
)
)
if name == "search_employees":
return _tool_response(_search_employees(db, arguments))
if name == "get_employee":
@@ -176,6 +208,7 @@ def _run_payload(run: CrawlRun) -> dict:
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"skipped_count": run.skipped_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,
}
@@ -183,8 +216,3 @@ def _run_payload(run: CrawlRun) -> dict:
def _tool_response(data: object) -> dict:
return {"content": [{"type": "text", "text": json.dumps(data, ensure_ascii=False, default=str)}]}
@metadata_router.get("/.well-known/oauth-protected-resource")
def oauth_protected_resource(settings: Settings = Depends(get_settings)) -> dict:
return mcp_protected_resource_metadata(settings)

View File

@@ -70,12 +70,14 @@ class CrawlRun(Base):
finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
found_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
parsed_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
skipped_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
new_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
error_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
dismissed_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
message: Mapped[str | None] = mapped_column(Text)
employee_changes: Mapped[list["CrawlRunEmployeeChange"]] = relationship(back_populates="crawl_run")
dataset_versions: Mapped[list["DatasetVersion"]] = relationship(back_populates="crawl_run")
class CrawlRunEmployeeChange(Base):
@@ -134,3 +136,63 @@ class ParserSource(Base):
source_url: Mapped[str] = mapped_column(Text, nullable=False)
enabled: Mapped[bool] = mapped_column(default=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
class ParseResourceCache(Base):
__tablename__ = "parse_resource_cache"
__table_args__ = (
UniqueConstraint("profile_key", "resource_key", "request_fingerprint", name="uq_parse_resource_cache_resource"),
Index("ix_parse_resource_cache_profile_key", "profile_key"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
profile_key: Mapped[str] = mapped_column(String(255), nullable=False)
resource_key: Mapped[str] = mapped_column(String(255), nullable=False)
method: Mapped[str] = mapped_column(String(16), nullable=False)
url: Mapped[str] = mapped_column(Text, nullable=False)
request_fingerprint: Mapped[str] = mapped_column(String(64), nullable=False)
etag: Mapped[str | None] = mapped_column(Text)
last_modified: Mapped[str | None] = mapped_column(Text)
body_hash: Mapped[str] = mapped_column(String(64), nullable=False)
body_snapshot: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
parser_version: Mapped[str | None] = mapped_column(String(32))
fetched_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
class DatasetVersion(Base):
__tablename__ = "dataset_versions"
__table_args__ = (
UniqueConstraint("hash", name="uq_dataset_versions_hash"),
Index("ix_dataset_versions_created_at", "created_at"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
hash: Mapped[str] = mapped_column(String(64), nullable=False)
previous_hash: Mapped[str | None] = mapped_column(String(64))
crawl_run_id: Mapped[int | None] = mapped_column(ForeignKey("crawl_runs.id"))
employee_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
active_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
dismissed_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
crawl_run: Mapped[CrawlRun | None] = relationship(back_populates="dataset_versions")
items: Mapped[list["DatasetVersionItem"]] = relationship(back_populates="dataset_version", cascade="all, delete-orphan")
class DatasetVersionItem(Base):
__tablename__ = "dataset_version_items"
__table_args__ = (
UniqueConstraint("dataset_version_id", "profile_key", name="uq_dataset_version_items_version_profile"),
Index("ix_dataset_version_items_hash", "dataset_version_id"),
Index("ix_dataset_version_items_profile_key", "profile_key"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
dataset_version_id: Mapped[int] = mapped_column(ForeignKey("dataset_versions.id"), nullable=False)
profile_key: Mapped[str] = mapped_column(String(255), nullable=False)
employee_id: Mapped[int | None] = mapped_column(ForeignKey("employees.id"))
status: Mapped[str] = mapped_column(String(32), nullable=False)
checksum: Mapped[str] = mapped_column(String(64), nullable=False)
dataset_version: Mapped[DatasetVersion] = relationship(back_populates="items")
employee: Mapped[Employee | None] = relationship()

View File

@@ -1,3 +1,5 @@
import hashlib
import json
import re
from urllib.parse import urljoin
@@ -149,22 +151,42 @@ def parse_person_profile(
headers: dict[str, str],
timeout: int,
use_playwright: bool = False,
resource_cache=None,
) -> dict | None:
normalized_url = normalize_profile_url(source_url)
if not normalized_url:
return None
response = session.get(normalized_url, headers=headers, timeout=timeout)
response.raise_for_status()
html = response.text
profile_type, profile_id = parse_profile_identity(normalized_url)
cache_profile_key = f"{profile_type}:{profile_id}"
resource_manifest = []
html = _fetch_text(
session,
normalized_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_key="main-html",
resource_manifest=resource_manifest,
)
if use_playwright:
html = _render_with_playwright(normalized_url, html)
soup = BeautifulSoup(html, "html.parser")
profile_type, profile_id = parse_profile_identity(normalized_url)
header = extract_person_header(soup, normalized_url)
tabs = extract_person_tabs(soup, normalized_url)
sections = extract_sections(soup, normalized_url)
sections = enrich_sections_from_hse_widgets(session, soup, normalized_url, headers, timeout, sections)
sections = enrich_sections_from_hse_widgets(
session,
soup,
normalized_url,
headers,
timeout,
sections,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_manifest=resource_manifest,
)
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
return {
@@ -181,6 +203,7 @@ def parse_person_profile(
"employee_internal_links": internal_links,
"parser_version": BACKEND_VERSION,
"_html": html,
"_resource_manifest": resource_manifest,
}
@@ -191,13 +214,33 @@ def enrich_sections_from_hse_widgets(
headers: dict[str, str],
timeout: int,
sections: list[dict],
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
enriched = list(sections)
publications = _load_widget_publications(session, soup, headers, timeout)
publications = _load_widget_publications(
session,
soup,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if publications:
enriched = _upsert_publications_section(enriched, publications)
theses = _load_widget_graduation_theses(session, soup, source_url, headers, timeout)
theses = _load_widget_graduation_theses(
session,
soup,
source_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if theses:
enriched = _upsert_graduation_theses_section(enriched, theses)
return enriched
@@ -226,7 +269,16 @@ def _render_with_playwright(source_url: str, fallback_html: str) -> str:
return fallback_html
def _load_widget_publications(session: Session, soup: BeautifulSoup, headers: dict[str, str], timeout: int) -> list[dict]:
def _load_widget_publications(
session: Session,
soup: BeautifulSoup,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[data-widget-name="AuthorSearch"][data-author]')
if not script:
return []
@@ -251,22 +303,37 @@ def _load_widget_publications(session: Session, soup: BeautifulSoup, headers: di
},
}
try:
response = session.post(
"https://publications.hse.ru/api/searchPubs",
json=payload,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
if resource_cache and profile_key:
text = _fetch_text(
session,
"https://publications.hse.ru/api/searchPubs",
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key=f"publications-page-{page_id}",
resource_manifest=resource_manifest,
method="POST",
json_payload=payload,
)
data = json.loads(text)
else:
response = session.post(
"https://publications.hse.ru/api/searchPubs",
json=payload,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return publications
result = data.get("result") if isinstance(data, dict) else {}
items = result.get("items") if isinstance(result, dict) else []
if not isinstance(items, list) or not items:
items = _extract_publication_items(result)
if not items:
break
publications.extend(_normalize_publication_item(item) for item in items if isinstance(item, dict))
publications.extend(_normalize_publication_item(item) for item in items)
total = int(result.get("total") or 0)
if not result.get("more") and len(publications) >= total:
@@ -275,12 +342,44 @@ def _load_widget_publications(session: Session, soup: BeautifulSoup, headers: di
return _dedupe_publications(publications)
def _extract_publication_items(result: object) -> list[dict]:
if not isinstance(result, dict):
return []
return _flatten_publication_items(result.get("items"))
def _flatten_publication_items(value: object) -> list[dict]:
if isinstance(value, list):
return [item for item in value if _is_publication_item(item)]
if not isinstance(value, dict):
return []
nested_items = value.get("items")
if isinstance(nested_items, list):
return [item for item in nested_items if _is_publication_item(item)]
if isinstance(nested_items, dict):
return _flatten_publication_items(nested_items)
publications = []
for child in value.values():
publications.extend(_flatten_publication_items(child))
return publications
def _is_publication_item(value: object) -> bool:
return isinstance(value, dict) and ("id" in value or "title" in value)
def _load_widget_graduation_theses(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[src*="/n/stat/vkr/app.js"][data-person-id]')
if not script:
@@ -292,14 +391,30 @@ def _load_widget_graduation_theses(
request_headers = {**headers, "x-portal-language": "ru"}
try:
response = session.get(
urljoin(source_url, api_url),
params={"supervisorId": person_id},
headers=request_headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
url = urljoin(source_url, api_url)
params = {"supervisorId": person_id}
if resource_cache and profile_key:
text = _fetch_text(
session,
url,
request_headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key="graduation-theses",
resource_manifest=resource_manifest,
params=params,
)
data = json.loads(text)
else:
response = session.get(
url,
params=params,
headers=request_headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return []
@@ -359,7 +474,7 @@ def _infer_section_type(title: str, nodes: list) -> str:
lowered = title.lower()
if _has_table(nodes):
return "table"
if "публикац" in lowered:
if _is_publications_title(lowered):
return "publications"
if "учебные курсы" in lowered:
return "courses_by_year"
@@ -370,6 +485,10 @@ def _infer_section_type(title: str, nodes: list) -> str:
return "generic"
def _is_publications_title(lowered_title: str) -> bool:
return lowered_title.startswith("публикац")
def _has_table(nodes: list) -> bool:
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
@@ -597,3 +716,62 @@ def _dedupe_dicts(items: list[dict]) -> list[dict]:
seen.add(key)
unique.append(item)
return unique
def _fetch_text(
session: Session,
url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_key: str,
resource_manifest: list[dict] | None,
method: str = "GET",
json_payload: object | None = None,
params: dict | None = None,
) -> str:
if resource_cache and profile_key:
cached = resource_cache.fetch_text(
session,
profile_key=profile_key,
resource_key=resource_key,
method=method,
url=url,
headers=headers,
timeout=timeout,
json_payload=json_payload,
params=params,
)
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": cached.body_hash,
"from_cache": cached.from_cache,
"status_code": cached.status_code,
}
)
return cached.text
if method.upper() == "POST":
response = session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params)
else:
response = session.get(url, headers=headers, timeout=timeout, params=params)
response.raise_for_status()
text = response.text
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": hashlib.sha256(text.encode("utf-8")).hexdigest(),
"from_cache": False,
"status_code": response.status_code,
}
)
return text

View File

@@ -3,10 +3,7 @@ import hashlib
import hmac
import json
import time
from functools import lru_cache
import jwt
from jwt import PyJWKClient, PyJWTError
from fastapi import HTTPException, Request, status
from app.config import Settings
@@ -47,93 +44,3 @@ def require_admin(request: Request, settings: Settings) -> str:
if not username:
raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/admin/login"})
return username
def require_mcp_auth(request: Request, settings: Settings) -> None:
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer "):
raise _mcp_unauthorized(settings, "Missing bearer token")
token = auth.removeprefix("Bearer ").strip()
if _mcp_static_token_allowed(settings) and hmac.compare_digest(token, settings.mcp_token):
return
if _mcp_oauth_allowed(settings):
_validate_mcp_oauth_token(token, settings)
return
raise _mcp_unauthorized(settings, "Invalid MCP token")
def require_mcp_token(request: Request, settings: Settings) -> None:
require_mcp_auth(request, settings)
def mcp_protected_resource_metadata(settings: Settings) -> dict:
authorization_servers = [settings.mcp_oauth_issuer.rstrip("/")] if settings.mcp_oauth_issuer else []
return {
"resource": settings.mcp_resource_url,
"authorization_servers": authorization_servers,
"bearer_methods_supported": ["header"],
"scopes_supported": [settings.mcp_oauth_required_scope],
"resource_documentation": settings.mcp_resource_url,
}
def _mcp_static_token_allowed(settings: Settings) -> bool:
return settings.mcp_auth_mode == "token"
def _mcp_oauth_allowed(settings: Settings) -> bool:
return settings.mcp_auth_mode == "oauth"
def _validate_mcp_oauth_token(token: str, settings: Settings) -> None:
if not settings.mcp_oauth_issuer or not settings.mcp_oauth_audience or not settings.oauth_jwks_url():
raise _mcp_unauthorized(settings, "MCP OAuth is not configured")
try:
signing_key = _get_mcp_oauth_signing_key(token, settings).key
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256", "RS384", "RS512", "ES256", "ES384", "ES512"],
audience=settings.mcp_oauth_audience,
issuer=settings.mcp_oauth_issuer.rstrip("/"),
)
except PyJWTError as exc:
raise _mcp_unauthorized(settings, "Invalid OAuth access token") from exc
if not _claims_have_scope(claims, settings.mcp_oauth_required_scope):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing required MCP OAuth scope")
def _claims_have_scope(claims: dict, required_scope: str) -> bool:
scopes: set[str] = set()
scope = claims.get("scope")
if isinstance(scope, str):
scopes.update(scope.split())
scp = claims.get("scp")
if isinstance(scp, str):
scopes.update(scp.split())
elif isinstance(scp, list):
scopes.update(str(item) for item in scp)
return required_scope in scopes
@lru_cache(maxsize=16)
def _get_jwk_client(jwks_url: str) -> PyJWKClient:
return PyJWKClient(jwks_url)
def _get_mcp_oauth_signing_key(token: str, settings: Settings):
return _get_jwk_client(settings.oauth_jwks_url()).get_signing_key_from_jwt(token)
def _mcp_unauthorized(settings: Settings, detail: str) -> HTTPException:
headers = {}
if _mcp_oauth_allowed(settings):
headers["WWW-Authenticate"] = f'Bearer resource_metadata="{_mcp_metadata_url(settings)}"'
return HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers=headers)
def _mcp_metadata_url(settings: Settings) -> str:
resource_url = settings.mcp_resource_url.rstrip("/")
base_url = resource_url[: -len("/mcp")] if resource_url.endswith("/mcp") else resource_url
return f"{base_url}/.well-known/oauth-protected-resource"

View File

@@ -153,7 +153,7 @@ def stats_payload(db: Session) -> dict[str, Any]:
def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
if not run:
return None
processed = run.parsed_count + run.error_count
processed = run.parsed_count + run.skipped_count + run.error_count
percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0
return {
"id": run.id,
@@ -166,6 +166,7 @@ def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
"finished_display": format_admin_datetime(run.finished_at),
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"skipped_count": run.skipped_count,
"new_count": run.new_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,

View File

@@ -1,6 +1,7 @@
import gzip
import hashlib
import json
import re
import time
from datetime import datetime, timezone
@@ -13,6 +14,8 @@ from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, E
from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key
from app.services.dataset_versions import get_or_create_current_version
from app.services.resource_cache import ResourceCache
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)"
@@ -28,8 +31,10 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
found_keys: set[str] = set()
parsed_count = 0
skipped_count = 0
try:
with requests.Session() as session:
resource_cache = ResourceCache(db)
urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout)
if settings.crawl_limit:
urls = urls[: settings.crawl_limit]
@@ -47,12 +52,17 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
HEADERS,
settings.request_timeout,
settings.parser_use_playwright,
resource_cache=resource_cache,
)
if not parsed:
continue
_upsert_employee(db, run, parsed)
parsed_count += 1
_, changed = _upsert_employee(db, run, parsed)
if changed:
parsed_count += 1
else:
skipped_count += 1
run.parsed_count = parsed_count
run.skipped_count = skipped_count
db.commit()
except Exception as exc:
run.error_count += 1
@@ -68,8 +78,9 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
finally:
time.sleep(settings.request_delay_seconds)
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
run.status = "completed"
get_or_create_current_version(db, crawl_run_id=run.id)
except Exception as exc:
run.status = "failed"
run.message = str(exc)
@@ -80,6 +91,54 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
return run
def refresh_employee(db: Session, employee: Employee, settings: Settings) -> CrawlRun:
run = CrawlRun(source_url=employee.canonical_url, status="running", found_count=1)
db.add(run)
db.commit()
db.refresh(run)
try:
with requests.Session() as session:
resource_cache = ResourceCache(db)
parsed = parse_person_profile(
session,
employee.canonical_url,
HEADERS,
settings.request_timeout,
settings.parser_use_playwright,
resource_cache=resource_cache,
)
if not parsed:
raise ValueError("Профиль не удалось распарсить.")
if _parsed_profile_key(parsed) != employee.profile_key:
raise ValueError("Распарсенный профиль не совпадает с обновляемым сотрудником.")
_, changed = _upsert_employee(db, run, parsed)
if changed:
run.parsed_count = 1
else:
run.skipped_count = 1
run.status = "completed"
get_or_create_current_version(db, crawl_run_id=run.id)
except Exception as exc:
run.status = "failed"
run.error_count = 1
run.message = str(exc)
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=employee.canonical_url,
error_type=type(exc).__name__,
message=str(exc),
)
)
finally:
run.finished_at = datetime.now(timezone.utc)
db.commit()
db.refresh(run)
return run
def _ensure_source(db: Session, source_url: str) -> ParserSource:
source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url))
if source:
@@ -91,10 +150,15 @@ def _ensure_source(db: Session, source_url: str) -> ParserSource:
return source
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
def _parsed_profile_key(parsed: dict) -> str:
return f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee, bool]:
html = parsed.pop("_html", None)
parsed.pop("_resource_manifest", None)
checksum = _checksum(parsed)
key = f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
key = _parsed_profile_key(parsed)
employee = db.scalar(select(Employee).where(Employee.profile_key == key))
now = datetime.now(timezone.utc)
if not employee:
@@ -111,12 +175,15 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
else:
is_new = False
parser_version = parsed.get("parser_version")
changed = is_new or employee.current_checksum != checksum or employee.parser_version != parser_version
employee.full_name = parsed.get("full_name")
employee.status = "active"
employee.last_seen_at = now
employee.dismissed_at = None
employee.parser_version = parsed.get("parser_version")
employee.current_data = parsed
employee.parser_version = parser_version
if changed:
employee.current_data = parsed
employee.current_checksum = checksum
db.flush()
@@ -130,28 +197,29 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
message="Сотрудник впервые найден в источнике.",
)
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
for tab in parsed.get("tabs") or []:
if changed:
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
for tab in parsed.get("tabs") or []:
db.add(
ProfileTab(
employee_id=employee.id,
title=tab.get("title") or "",
href=tab.get("href") or "",
data_index=tab.get("data_index"),
)
)
db.add(
ProfileTab(
EmployeeSnapshot(
employee_id=employee.id,
title=tab.get("title") or "",
href=tab.get("href") or "",
data_index=tab.get("data_index"),
crawl_run_id=run.id,
parsed_data=parsed,
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
checksum=checksum,
parser_version=parser_version,
)
)
db.add(
EmployeeSnapshot(
employee_id=employee.id,
crawl_run_id=run.id,
parsed_data=parsed,
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
checksum=checksum,
parser_version=parsed.get("parser_version"),
)
)
return employee
return employee, changed
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
@@ -219,5 +287,23 @@ def _record_employee_change(
def _checksum(data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
payload = json.dumps(_stable_checksum_payload(data), ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _stable_checksum_payload(value):
if isinstance(value, dict):
return {key: _stable_checksum_payload(item) for key, item in value.items()}
if isinstance(value, list):
return [_stable_checksum_payload(item) for item in value]
if isinstance(value, str):
return _normalize_date_dependent_experience(value)
return value
def _normalize_date_dependent_experience(value: str) -> str:
return re.sub(
r"(?i)(стаж(?:\s+работы)?(?:\s+в\s+ниу\s+вшэ|\s+в\s+вшэ)?\s*:?\s*)\d+\s*(?:год(?:а|ов)?|лет)",
r"\1<experience-years>",
value,
)

View File

@@ -0,0 +1,227 @@
import hashlib
import json
from dataclasses import dataclass
from sqlalchemy import desc, select
from sqlalchemy.orm import Session
from app.models import DatasetVersion, DatasetVersionItem, Employee
@dataclass(frozen=True)
class EmployeeMarker:
profile_key: str
employee_id: int | None
status: str
checksum: str
def get_or_create_current_version(db: Session, *, crawl_run_id: int | None = None) -> DatasetVersion:
employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all()
markers = [_employee_marker(employee) for employee in employees]
dataset_hash = _dataset_hash(markers)
latest = get_latest_version(db)
if latest and latest.hash == dataset_hash:
return latest
active_count = sum(1 for marker in markers if marker.status == "active")
dismissed_count = sum(1 for marker in markers if marker.status == "dismissed")
version = DatasetVersion(
hash=dataset_hash,
previous_hash=latest.hash if latest else None,
crawl_run_id=crawl_run_id,
employee_count=len(markers),
active_count=active_count,
dismissed_count=dismissed_count,
)
db.add(version)
db.flush()
for marker in markers:
db.add(
DatasetVersionItem(
dataset_version_id=version.id,
profile_key=marker.profile_key,
employee_id=marker.employee_id,
status=marker.status,
checksum=marker.checksum,
)
)
db.flush()
return version
def get_latest_version(db: Session) -> DatasetVersion | None:
return db.scalar(select(DatasetVersion).order_by(desc(DatasetVersion.created_at), desc(DatasetVersion.id)).limit(1))
def get_version_by_hash(db: Session, dataset_hash: str | None) -> DatasetVersion | None:
if not dataset_hash:
return None
return db.scalar(select(DatasetVersion).where(DatasetVersion.hash == dataset_hash).limit(1))
def service_info_payload(db: Session, *, tools: list[dict], service_name: str, backend_version: str, protocol_version: str) -> dict:
version = get_or_create_current_version(db)
db.commit()
return {
"service_name": service_name,
"backend_version": backend_version,
"protocolVersion": protocol_version,
"tools": tools,
"dataset": _version_payload(version),
}
def sync_employees_payload(db: Session, *, client_hash: str | None = None, include_data: bool = True) -> dict:
current = get_or_create_current_version(db)
db.commit()
if not client_hash:
return _full_sync_payload(db, current, include_data=include_data, reason=None)
if client_hash == current.hash:
return {
"mode": "delta",
"from_hash": client_hash,
"to_hash": current.hash,
"dataset": _version_payload(current),
"changes": {"added": [], "updated": [], "dismissed": [], "removed": []},
}
previous = get_version_by_hash(db, client_hash)
if not previous:
return _full_sync_payload(db, current, include_data=include_data, reason="unknown_client_hash", from_hash=client_hash)
return _delta_sync_payload(db, previous, current, include_data=include_data)
def _full_sync_payload(
db: Session,
current: DatasetVersion,
*,
include_data: bool,
reason: str | None,
from_hash: str | None = None,
) -> dict:
employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all()
payload = {
"mode": "full",
"from_hash": from_hash,
"to_hash": current.hash,
"dataset": _version_payload(current),
"items": [_employee_payload(employee, include_data=include_data) for employee in employees],
}
if reason:
payload["reason"] = reason
return payload
def _delta_sync_payload(db: Session, previous: DatasetVersion, current: DatasetVersion, *, include_data: bool) -> dict:
previous_items = _items_by_profile_key(previous)
current_items = _items_by_profile_key(current)
employees = {employee.profile_key: employee for employee in db.scalars(select(Employee)).all()}
added = []
updated = []
dismissed = []
removed = []
for profile_key, current_item in sorted(current_items.items()):
previous_item = previous_items.get(profile_key)
employee = employees.get(profile_key)
if not previous_item:
if employee:
added.append(_employee_payload(employee, include_data=include_data))
continue
if previous_item.checksum == current_item.checksum and previous_item.status == current_item.status:
continue
if current_item.status == "dismissed":
dismissed.append(_tombstone(profile_key, current_item.status, employee))
elif employee:
updated.append(_employee_payload(employee, include_data=include_data))
for profile_key, previous_item in sorted(previous_items.items()):
if profile_key not in current_items:
removed.append(_tombstone(profile_key, "removed", employees.get(profile_key), checksum=previous_item.checksum))
return {
"mode": "delta",
"from_hash": previous.hash,
"to_hash": current.hash,
"dataset": _version_payload(current),
"changes": {
"added": added,
"updated": updated,
"dismissed": dismissed,
"removed": removed,
},
}
def _items_by_profile_key(version: DatasetVersion) -> dict[str, DatasetVersionItem]:
return {item.profile_key: item for item in version.items}
def _version_payload(version: DatasetVersion) -> dict:
return {
"hash": version.hash,
"previous_hash": version.previous_hash,
"created_at": version.created_at.isoformat() if version.created_at else None,
"crawl_run_id": version.crawl_run_id,
"employee_count": version.employee_count,
"active_count": version.active_count,
"dismissed_count": version.dismissed_count,
}
def _employee_marker(employee: Employee) -> EmployeeMarker:
return EmployeeMarker(
profile_key=employee.profile_key,
employee_id=employee.id,
status=employee.status,
checksum=employee.current_checksum or _payload_hash(employee.current_data or {}),
)
def _dataset_hash(markers: list[EmployeeMarker]) -> str:
payload = [
{"profile_key": marker.profile_key, "status": marker.status, "checksum": marker.checksum}
for marker in sorted(markers, key=lambda item: item.profile_key)
]
return _payload_hash(payload)
def _payload_hash(value: object) -> str:
payload = json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _employee_payload(employee: Employee, *, include_data: bool) -> dict:
payload = {
"profile_key": employee.profile_key,
"profile_id": employee.profile_id,
"full_name": employee.full_name,
"status": employee.status,
"canonical_url": employee.canonical_url,
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
"checksum": employee.current_checksum or _payload_hash(employee.current_data or {}),
}
if include_data:
payload["data"] = employee.current_data
return payload
def _tombstone(profile_key: str, status: str, employee: Employee | None, *, checksum: str | None = None) -> dict:
payload = {
"profile_key": profile_key,
"status": status,
"checksum": checksum or (employee.current_checksum if employee else None),
}
if employee:
payload.update(
{
"profile_id": employee.profile_id,
"full_name": employee.full_name,
"canonical_url": employee.canonical_url,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
}
)
return payload

View File

@@ -0,0 +1,147 @@
from __future__ import annotations
import gzip
import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import requests
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models import ParseResourceCache
from app.version import BACKEND_VERSION
@dataclass(frozen=True)
class CachedResource:
text: str
body_hash: str
from_cache: bool
status_code: int
class ResourceCache:
def __init__(self, db: Session):
self.db = db
def fetch_text(
self,
session: requests.Session,
*,
profile_key: str,
resource_key: str,
method: str,
url: str,
headers: dict[str, str],
timeout: int,
json_payload: Any | None = None,
params: dict[str, Any] | None = None,
) -> CachedResource:
method = method.upper()
fingerprint = _request_fingerprint(method=method, url=url, json_payload=json_payload, params=params)
cached = self.db.scalar(
select(ParseResourceCache).where(
ParseResourceCache.profile_key == profile_key,
ParseResourceCache.resource_key == resource_key,
ParseResourceCache.request_fingerprint == fingerprint,
)
)
request_headers = dict(headers)
if cached:
if cached.etag:
request_headers["If-None-Match"] = cached.etag
if cached.last_modified:
request_headers["If-Modified-Since"] = cached.last_modified
response = _send(
session,
method=method,
url=url,
headers=request_headers,
timeout=timeout,
json_payload=json_payload,
params=params,
)
if response.status_code == 304 and cached:
cached.fetched_at = datetime.now(timezone.utc)
self.db.flush()
return CachedResource(
text=gzip.decompress(cached.body_snapshot).decode("utf-8"),
body_hash=cached.body_hash,
from_cache=True,
status_code=response.status_code,
)
response.raise_for_status()
text = response.text
body_hash = _body_hash(text)
etag = response.headers.get("ETag") if hasattr(response, "headers") else None
last_modified = response.headers.get("Last-Modified") if hasattr(response, "headers") else None
if cached:
cached.method = method
cached.url = url
cached.etag = etag
cached.last_modified = last_modified
cached.body_hash = body_hash
cached.body_snapshot = gzip.compress(text.encode("utf-8"))
cached.parser_version = BACKEND_VERSION
cached.fetched_at = datetime.now(timezone.utc)
else:
self.db.add(
ParseResourceCache(
profile_key=profile_key,
resource_key=resource_key,
method=method,
url=url,
request_fingerprint=fingerprint,
etag=etag,
last_modified=last_modified,
body_hash=body_hash,
body_snapshot=gzip.compress(text.encode("utf-8")),
parser_version=BACKEND_VERSION,
fetched_at=datetime.now(timezone.utc),
)
)
self.db.flush()
return CachedResource(text=text, body_hash=body_hash, from_cache=False, status_code=response.status_code)
def _send(
session: requests.Session,
*,
method: str,
url: str,
headers: dict[str, str],
timeout: int,
json_payload: Any | None,
params: dict[str, Any] | None,
) -> requests.Response:
if method == "POST":
return session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params)
return session.get(url, headers=headers, timeout=timeout, params=params)
def _request_fingerprint(
*,
method: str,
url: str,
json_payload: Any | None,
params: dict[str, Any] | None,
) -> str:
payload = {
"method": method,
"url": url,
"json": json_payload,
"params": params,
}
encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()
def _body_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()

View File

@@ -171,6 +171,10 @@
background: transparent;
}
.button--compact {
padding: 8px 12px;
}
.code {
overflow-x: auto;
padding: 14px;
@@ -201,11 +205,34 @@
gap: 10px;
}
.employee-card__actions {
display: grid;
justify-items: end;
gap: 10px;
}
.employee-card__title {
margin: 0;
font-size: 24px;
}
.employee-card__notice {
margin: 0;
padding: 12px 14px;
border-radius: 8px;
font-weight: 700;
}
.employee-card__notice--success {
color: #065f46;
background: #d1fae5;
}
.employee-card__notice--error {
color: #991b1b;
background: #fee2e2;
}
.employee-card__section {
padding: 20px;
background: #ffffff;

View File

@@ -89,12 +89,14 @@
const status = document.querySelector("[data-progress-status]");
const processed = document.querySelector("[data-progress-processed]");
const found = document.querySelector("[data-progress-found]");
const skipped = document.querySelector("[data-progress-skipped]");
const errors = document.querySelector("[data-progress-errors]");
const fill = document.querySelector("[data-progress-fill]");
const percent = document.querySelector("[data-progress-percent]");
if (status) status.textContent = run.status_display || run.status;
if (processed) processed.textContent = run.processed_count;
if (found) found.textContent = run.found_count;
if (skipped) skipped.textContent = run.skipped_count;
if (errors) errors.textContent = run.error_count;
if (fill) fill.style.width = `${run.progress_percent}%`;
if (percent) percent.textContent = run.progress_percent;

View File

@@ -37,6 +37,7 @@
<div class="progress-panel__meta">
<span data-progress-status>{{ run.status_display if run else "Ожидание" }}</span>
<span>обработано: <span data-progress-processed>{{ run.processed_count if run else 0 }}</span> / <span data-progress-found>{{ run.found_count if run else 0 }}</span></span>
<span>без изменений: <span data-progress-skipped>{{ run.skipped_count if run else 0 }}</span></span>
<span>ошибок: <span data-progress-errors>{{ run.error_count if run else 0 }}</span></span>
</div>
<div class="progress-bar" aria-label="Parsing progress">
@@ -48,10 +49,10 @@
<section class="panel">
<h2 class="panel__title">Последние запуски</h2>
<table class="table">
<thead><tr><th class="table__head">ID</th><th class="table__head">Статус</th><th class="table__head">Обработано</th><th class="table__head">Ошибки</th><th class="table__head">Старт</th></tr></thead>
<thead><tr><th class="table__head">ID</th><th class="table__head">Статус</th><th class="table__head">Обработано</th><th class="table__head">Без изменений</th><th class="table__head">Ошибки</th><th class="table__head">Старт</th></tr></thead>
<tbody>
{% for run in runs %}
<tr class="table__row" data-row-href="/admin/runs/{{ run.id }}" role="link" tabindex="0"><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status_display }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.started_display }}</td></tr>
<tr class="table__row" onclick="window.location.href='/admin/runs/{{ run.id }}'" onkeydown="if (event.key === 'Enter' || event.key === ' ') { event.preventDefault(); window.location.href='/admin/runs/{{ run.id }}'; }" role="link" tabindex="0"><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status_display }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.skipped_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.started_display }}</td></tr>
{% endfor %}
</tbody>
</table>

View File

@@ -7,8 +7,18 @@
<h2 class="employee-card__title">{{ employee_view.full_name or employee.profile_key }}</h2>
<span class="badge {% if employee_view.status == "dismissed" %}badge--dismissed{% endif %}">{{ employee_view.status_display }}</span>
</div>
<a class="admin__link" href="{{ employee_view.canonical_url }}">{{ employee_view.canonical_url }}</a>
<div class="employee-card__actions">
<form method="post" action="/admin/employees/{{ employee.id }}/refresh">
<button class="button button--compact" type="submit">Обновить данные</button>
</form>
<a class="admin__link" href="{{ employee_view.canonical_url }}">{{ employee_view.canonical_url }}</a>
</div>
</div>
{% if refresh_status == "success" %}
<p class="employee-card__notice employee-card__notice--success">Данные сотрудника обновлены.</p>
{% elif refresh_status == "error" %}
<p class="employee-card__notice employee-card__notice--error">Не удалось обновить данные сотрудника.</p>
{% endif %}
<section class="employee-card__section">
<h3 class="employee-section__title">Основная информация</h3>

View File

@@ -12,6 +12,7 @@
<div class="stats-strip">
<div class="stats-strip__item"><span class="stats-strip__label">Найдено</span><span class="stats-strip__value">{{ run.found_count }}</span></div>
<div class="stats-strip__item"><span class="stats-strip__label">Обработано</span><span class="stats-strip__value">{{ run.parsed_count }}</span></div>
<div class="stats-strip__item"><span class="stats-strip__label">Без изменений</span><span class="stats-strip__value">{{ run.skipped_count }}</span></div>
<div class="stats-strip__item"><span class="stats-strip__label">Новые</span><span class="stats-strip__value">{{ run.new_count }}</span></div>
<div class="stats-strip__item"><span class="stats-strip__label">Потеряшки</span><span class="stats-strip__value">{{ run.changes.missing_from_source | length }}</span></div>
<div class="stats-strip__item"><span class="stats-strip__label">Уволены</span><span class="stats-strip__value">{{ run.dismissed_count }}</span></div>

View File

@@ -8,12 +8,13 @@
</div>
{% set run = runs[0] if runs else none %}
{% if run %}
{% set processed = run.parsed_count + run.error_count %}
{% set processed = run.parsed_count + run.skipped_count + run.error_count %}
{% set percent = ((processed / run.found_count) * 100) | round(1) if run.found_count else 0 %}
<div class="progress-panel" data-progress-panel>
<div class="progress-panel__meta">
<span data-progress-status>{{ run.status_display }}</span>
<span>обработано: <span data-progress-processed>{{ processed }}</span> / <span data-progress-found>{{ run.found_count }}</span></span>
<span>без изменений: <span data-progress-skipped>{{ run.skipped_count }}</span></span>
<span>ошибок: <span data-progress-errors>{{ run.error_count }}</span></span>
</div>
<div class="progress-bar" aria-label="Parsing progress">
@@ -26,6 +27,7 @@
<div class="progress-panel__meta">
<span data-progress-status>Ожидание</span>
<span>обработано: <span data-progress-processed>0</span> / <span data-progress-found>0</span></span>
<span>без изменений: <span data-progress-skipped>0</span></span>
<span>ошибок: <span data-progress-errors>0</span></span>
</div>
<div class="progress-bar" aria-label="Parsing progress">
@@ -35,10 +37,10 @@
</div>
{% endif %}
<table class="table">
<thead><tr><th class="table__head">ID</th><th class="table__head">Статус</th><th class="table__head">Найдено</th><th class="table__head">Обработано</th><th class="table__head">Новые</th><th class="table__head">Ошибки</th><th class="table__head">Уволены</th><th class="table__head">Старт</th></tr></thead>
<thead><tr><th class="table__head">ID</th><th class="table__head">Статус</th><th class="table__head">Найдено</th><th class="table__head">Обработано</th><th class="table__head">Без изменений</th><th class="table__head">Новые</th><th class="table__head">Ошибки</th><th class="table__head">Уволены</th><th class="table__head">Старт</th></tr></thead>
<tbody>
{% for run in runs %}
<tr class="table__row" data-row-href="/admin/runs/{{ run.id }}" role="link" tabindex="0"><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status_display }}</td><td class="table__cell">{{ run.found_count }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.new_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.dismissed_count }}</td><td class="table__cell">{{ run.started_display }}</td></tr>
<tr class="table__row" onclick="window.location.href='/admin/runs/{{ run.id }}'" onkeydown="if (event.key === 'Enter' || event.key === ' ') { event.preventDefault(); window.location.href='/admin/runs/{{ run.id }}'; }" role="link" tabindex="0"><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status_display }}</td><td class="table__cell">{{ run.found_count }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.skipped_count }}</td><td class="table__cell">{{ run.new_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.dismissed_count }}</td><td class="table__cell">{{ run.started_display }}</td></tr>
{% endfor %}
</tbody>
</table>

View File

@@ -1,3 +1,3 @@
APP_VERSION = "0.4.3"
FRONTEND_VERSION = "0.4.3"
BACKEND_VERSION = "0.4.3"
APP_VERSION = "0.6.0"
FRONTEND_VERSION = "0.6.0"
BACKEND_VERSION = "0.6.0"

View File

@@ -17,7 +17,14 @@ def crawl_once() -> None:
settings = get_settings()
with SessionLocal() as db:
run = run_crawl(db, settings)
logger.info("crawl finished: id=%s status=%s parsed=%s errors=%s", run.id, run.status, run.parsed_count, run.error_count)
logger.info(
"crawl finished: id=%s status=%s parsed=%s skipped=%s errors=%s",
run.id,
run.status,
run.parsed_count,
run.skipped_count,
run.error_count,
)
def main() -> None:

View File

@@ -20,7 +20,7 @@ services:
environment:
DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-miem}:${POSTGRES_PASSWORD:-miem_password}@postgres:5432/${POSTGRES_DB:-miem_workers}
ports:
- "127.0.0.1:8000:8000"
- "127.0.0.1:${API_PORT:-8000}:8000"
depends_on:
postgres:
condition: service_healthy
@@ -42,33 +42,7 @@ services:
environment:
DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-miem}:${POSTGRES_PASSWORD:-miem_password}@postgres:5432/${POSTGRES_DB:-miem_workers}
ports:
- "127.0.0.1:8001:8000"
depends_on:
postgres:
condition: service_healthy
keycloak:
image: quay.io/keycloak/keycloak:latest
container_name: keycloak
restart: unless-stopped
environment:
KC_DB: postgres
KC_DB_URL: jdbc:postgresql://postgres:5432/${KEYCLOAK_DB_NAME}
KC_DB_USERNAME: ${KEYCLOAK_DB_USER}
KC_DB_PASSWORD: ${KEYCLOAK_DB_PASSWORD}
KEYCLOAK_ADMIN: ${KEYCLOAK_ADMIN}
KEYCLOAK_ADMIN_PASSWORD: ${KEYCLOAK_ADMIN_PASSWORD}
KC_HTTP_ENABLED: true
KC_PROXY_HEADERS: xforwarded
KC_HOSTNAME: ${KEYCLOAK_HOSTNAME}
KC_HEALTH_ENABLED: true
KC_METRICS_ENABLED: true
command: start
ports:
- "127.0.0.1:8080:8080"
- "127.0.0.1:${MCP_PORT:-8001}:8000"
depends_on:
postgres:
condition: service_healthy

View File

@@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS crawl_runs (
finished_at TIMESTAMPTZ,
found_count INTEGER NOT NULL DEFAULT 0,
parsed_count INTEGER NOT NULL DEFAULT 0,
skipped_count INTEGER NOT NULL DEFAULT 0,
new_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
dismissed_count INTEGER NOT NULL DEFAULT 0,
@@ -73,3 +74,22 @@ CREATE TABLE IF NOT EXISTS profile_tabs (
);
CREATE INDEX IF NOT EXISTS ix_profile_tabs_employee_id ON profile_tabs (employee_id);
CREATE TABLE IF NOT EXISTS parse_resource_cache (
id SERIAL PRIMARY KEY,
profile_key VARCHAR(255) NOT NULL,
resource_key VARCHAR(255) NOT NULL,
method VARCHAR(16) NOT NULL,
url TEXT NOT NULL,
request_fingerprint VARCHAR(64) NOT NULL,
etag TEXT,
last_modified TEXT,
body_hash VARCHAR(64) NOT NULL,
body_snapshot BYTEA NOT NULL,
parser_version VARCHAR(32),
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_parse_resource_cache_resource UNIQUE (profile_key, resource_key, request_fingerprint)
);
CREATE INDEX IF NOT EXISTS ix_parse_resource_cache_profile_key
ON parse_resource_cache (profile_key);

View File

@@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS dataset_versions (
id SERIAL PRIMARY KEY,
hash VARCHAR(64) NOT NULL UNIQUE,
previous_hash VARCHAR(64),
crawl_run_id INTEGER REFERENCES crawl_runs(id),
employee_count INTEGER NOT NULL DEFAULT 0,
active_count INTEGER NOT NULL DEFAULT 0,
dismissed_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_dataset_versions_created_at
ON dataset_versions (created_at);
CREATE TABLE IF NOT EXISTS dataset_version_items (
id SERIAL PRIMARY KEY,
dataset_version_id INTEGER NOT NULL REFERENCES dataset_versions(id),
profile_key VARCHAR(255) NOT NULL,
employee_id INTEGER REFERENCES employees(id),
status VARCHAR(32) NOT NULL,
checksum VARCHAR(64) NOT NULL,
CONSTRAINT uq_dataset_version_items_version_profile UNIQUE (dataset_version_id, profile_key)
);
CREATE INDEX IF NOT EXISTS ix_dataset_version_items_hash
ON dataset_version_items (dataset_version_id);
CREATE INDEX IF NOT EXISTS ix_dataset_version_items_profile_key
ON dataset_version_items (profile_key);

View File

@@ -0,0 +1,21 @@
ALTER TABLE crawl_runs
ADD COLUMN IF NOT EXISTS skipped_count INTEGER NOT NULL DEFAULT 0;
CREATE TABLE IF NOT EXISTS parse_resource_cache (
id SERIAL PRIMARY KEY,
profile_key VARCHAR(255) NOT NULL,
resource_key VARCHAR(255) NOT NULL,
method VARCHAR(16) NOT NULL,
url TEXT NOT NULL,
request_fingerprint VARCHAR(64) NOT NULL,
etag TEXT,
last_modified TEXT,
body_hash VARCHAR(64) NOT NULL,
body_snapshot BYTEA NOT NULL,
parser_version VARCHAR(32),
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT uq_parse_resource_cache_resource UNIQUE (profile_key, resource_key, request_fingerprint)
);
CREATE INDEX IF NOT EXISTS ix_parse_resource_cache_profile_key
ON parse_resource_cache (profile_key);

View File

@@ -1,6 +1,6 @@
[project]
name = "miem-workers"
version = "0.4.0"
version = "0.6.0"
description = "MIEM employees parser, admin API, and MCP server"
requires-python = ">=3.11"
dependencies = [
@@ -12,7 +12,6 @@ dependencies = [
"lxml>=5.2.0",
"psycopg[binary]>=3.2.0",
"pydantic-settings>=2.4.0",
"PyJWT[crypto]>=2.9.0",
"python-multipart>=0.0.9",
"requests>=2.32.0",
"sqlalchemy>=2.0.32",

View File

@@ -6,7 +6,6 @@ jinja2>=3.1.4
lxml>=5.2.0
psycopg[binary]>=3.2.0
pydantic-settings>=2.4.0
PyJWT[crypto]>=2.9.0
python-multipart>=0.0.9
requests>=2.32.0
sqlalchemy>=2.0.32

View File

@@ -200,13 +200,14 @@ def test_run_payload_calculates_progress():
status="running",
found_count=10,
parsed_count=4,
skipped_count=2,
error_count=1,
)
payload = run_payload(run)
assert payload["processed_count"] == 5
assert payload["progress_percent"] == 50.0
assert payload["processed_count"] == 7
assert payload["progress_percent"] == 70.0
assert payload["status_display"] == "Выполняется"

View File

@@ -45,9 +45,11 @@ def test_dashboard_limits_latest_runs_to_five():
def test_runs_template_links_to_run_detail():
template = Path("app/templates/runs.html").read_text(encoding="utf-8")
assert 'data-row-href="/admin/runs/{{ run.id }}"' in template
assert 'onclick="window.location.href=\'/admin/runs/{{ run.id }}\'"' in template
assert "onkeydown=\"if (event.key === 'Enter' || event.key === ' ')" in template
assert 'role="link"' in template
assert 'tabindex="0"' in template
assert 'data-row-href="/admin/runs/{{ run.id }}"' not in template
assert '<a class="admin__link" href="/admin/runs/{{ run.id }}">' not in template
@@ -75,9 +77,11 @@ def test_dashboard_metric_cards_link_to_admin_targets():
def test_dashboard_latest_run_rows_link_to_run_detail():
template = Path("app/templates/dashboard.html").read_text(encoding="utf-8")
assert 'data-row-href="/admin/runs/{{ run.id }}"' in template
assert 'onclick="window.location.href=\'/admin/runs/{{ run.id }}\'"' in template
assert "onkeydown=\"if (event.key === 'Enter' || event.key === ' ')" in template
assert 'role="link"' in template
assert 'tabindex="0"' in template
assert 'data-row-href="/admin/runs/{{ run.id }}"' not in template
assert '<a class="admin__link" href="/admin/runs/{{ run.id }}">' not in template

View File

@@ -1,15 +1,12 @@
import time
import json
from datetime import datetime, timezone
from types import SimpleNamespace
import jwt
from fastapi.testclient import TestClient
from cryptography.hazmat.primitives.asymmetric import rsa
from sqlalchemy import create_engine
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import app.security as security
from app.config import Settings, get_settings
from app.db import Base, get_db
from app.main import app
@@ -23,10 +20,10 @@ def test_health_returns_versions():
response = client.get("/api/health")
assert response.status_code == 200
assert response.json()["backend_version"] == "0.4.3"
assert response.json()["backend_version"] == "0.6.0"
def test_mcp_requires_token_and_lists_tools():
def test_mcp_lists_tools_without_auth_and_ignores_auth_header():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
@@ -43,22 +40,23 @@ def test_mcp_requires_token_and_lists_tools():
session.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
)
client = TestClient(app)
unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
authorized = client.post(
without_auth = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
with_auth = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
headers={"Authorization": "Bearer anything"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert unauthorized.status_code == 401
assert authorized.status_code == 200
assert authorized.json()["result"]["tools"][0]["name"] == "search_employees"
assert any(tool["name"] == "get_crawl_run_details" for tool in authorized.json()["result"]["tools"])
assert without_auth.status_code == 200
assert with_auth.status_code == 200
tool_names = {tool["name"] for tool in without_auth.json()["result"]["tools"]}
assert "search_employees" in tool_names
assert "get_service_info" in tool_names
assert "sync_employees" in tool_names
assert any(tool["name"] == "get_crawl_run_details" for tool in without_auth.json()["result"]["tools"])
assert with_auth.json()["result"]["tools"] == without_auth.json()["result"]["tools"]
app.dependency_overrides.clear()
@@ -96,14 +94,10 @@ def test_mcp_search_employees_returns_matching_employee():
db.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
)
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={
"jsonrpc": "2.0",
"id": 1,
@@ -118,6 +112,128 @@ def test_mcp_search_employees_returns_matching_employee():
app.dependency_overrides.clear()
def test_mcp_service_info_returns_tools_and_dataset_hash():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
session.add(
Employee(
profile_key="staff:alpha",
profile_type="staff",
profile_id="alpha",
canonical_url="https://www.hse.ru/staff/alpha",
full_name="Alpha Person",
status="active",
current_checksum="a" * 64,
current_data={"sections": []},
)
)
session.commit()
session.close()
def override_db():
db = Session()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.post(
"/mcp",
json={"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "get_service_info", "arguments": {}}},
)
assert response.status_code == 200
payload = json.loads(response.json()["result"]["content"][0]["text"])
assert payload["service_name"] == "miem-employees"
assert payload["backend_version"] == "0.6.0"
assert payload["dataset"]["hash"]
assert any(tool["name"] == "sync_employees" for tool in payload["tools"])
app.dependency_overrides.clear()
def test_mcp_sync_employees_full_empty_and_unknown_hash_modes():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
session.add(
Employee(
profile_key="staff:alpha",
profile_type="staff",
profile_id="alpha",
canonical_url="https://www.hse.ru/staff/alpha",
full_name="Alpha Person",
status="active",
current_checksum="a" * 64,
current_data={"sections": [{"type": "paragraphs"}]},
)
)
session.commit()
session.close()
def override_db():
db = Session()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
full_response = client.post(
"/mcp",
json={"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "sync_employees", "arguments": {}}},
)
full_payload = json.loads(full_response.json()["result"]["content"][0]["text"])
current_hash = full_payload["to_hash"]
empty_response = client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": "sync_employees", "arguments": {"client_hash": current_hash}},
},
)
empty_payload = json.loads(empty_response.json()["result"]["content"][0]["text"])
unknown_response = client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": "sync_employees", "arguments": {"client_hash": "missing"}},
},
)
unknown_payload = json.loads(unknown_response.json()["result"]["content"][0]["text"])
assert full_payload["mode"] == "full"
assert full_payload["items"][0]["data"] == {"sections": [{"type": "paragraphs"}]}
assert empty_payload["mode"] == "delta"
assert empty_payload["changes"] == {"added": [], "updated": [], "dismissed": [], "removed": []}
assert unknown_payload["mode"] == "full"
assert unknown_payload["reason"] == "unknown_client_hash"
app.dependency_overrides.clear()
def test_mcp_get_crawl_run_details_returns_changes():
engine = create_engine(
"sqlite:///:memory:",
@@ -164,14 +280,10 @@ def test_mcp_get_crawl_run_details_returns_changes():
db.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
)
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={
"jsonrpc": "2.0",
"id": 1,
@@ -188,146 +300,12 @@ def test_mcp_get_crawl_run_details_returns_changes():
app.dependency_overrides.clear()
def test_mcp_oauth_rejects_static_token():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def override_db():
session = Session()
try:
yield session
finally:
session.close()
settings = Settings(
mcp_auth_mode="oauth",
mcp_token="secret",
session_secret="session-secret",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
)
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 401
assert response.headers["www-authenticate"] == (
'Bearer resource_metadata="http://localhost:8001/.well-known/oauth-protected-resource"'
)
app.dependency_overrides.clear()
def test_mcp_oauth_missing_auth_returns_metadata_challenge():
settings = Settings(
mcp_auth_mode="oauth",
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
)
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
response = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
assert response.status_code == 401
assert response.headers["www-authenticate"] == (
'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"'
)
app.dependency_overrides.clear()
def test_mcp_accepts_valid_oauth_jwt(monkeypatch):
public_key, token = _oauth_key_and_token()
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 200
assert response.json()["result"]["tools"][0]["name"] == "search_employees"
app.dependency_overrides.clear()
def test_mcp_rejects_invalid_oauth_jwts(monkeypatch):
public_key, expired_token = _oauth_key_and_token(exp=int(time.time()) - 60)
_, wrong_issuer_token = _oauth_key_and_token(issuer="https://other.example.com")
_, wrong_audience_token = _oauth_key_and_token(audience="other-audience")
_, bad_signature_token = _oauth_key_and_token(public_key=public_key)
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
for token in [expired_token, wrong_issuer_token, wrong_audience_token, bad_signature_token]:
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 401
app.dependency_overrides.clear()
def test_mcp_rejects_oauth_jwt_without_required_scope(monkeypatch):
public_key, token = _oauth_key_and_token(scope="profile")
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 403
app.dependency_overrides.clear()
def test_mcp_protected_resource_metadata_uses_settings():
settings = Settings(
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com/",
mcp_oauth_required_scope="mcp:tools",
)
app.dependency_overrides[get_settings] = lambda: settings
def test_mcp_protected_resource_metadata_route_is_removed():
client = TestClient(app)
response = client.get("/.well-known/oauth-protected-resource")
assert response.status_code == 200
assert response.json() == {
"resource": "https://api.example.com/mcp",
"authorization_servers": ["https://auth.example.com"],
"bearer_methods_supported": ["header"],
"scopes_supported": ["mcp:tools"],
"resource_documentation": "https://api.example.com/mcp",
}
app.dependency_overrides.clear()
assert response.status_code == 404
def test_api_employees_and_stats_require_admin_session():
@@ -399,33 +377,54 @@ def test_api_employees_and_stats_require_admin_session():
app.dependency_overrides.clear()
def _oauth_settings() -> Settings:
return Settings(
mcp_auth_mode="oauth",
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
session_secret="session-secret",
def test_admin_refresh_employee_route_updates_only_requested_employee(monkeypatch):
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
db = Session()
db.add(
Employee(
profile_key="org_person:133709486",
profile_type="org_person",
profile_id="133709486",
canonical_url="https://www.hse.ru/org/persons/133709486",
full_name="Будков Юрий Алексеевич",
status="active",
)
)
db.commit()
employee_id = db.scalar(select(Employee.id))
db.close()
settings = Settings(admin_username="admin", admin_password="password", session_secret="session-secret")
def _oauth_key_and_token(
*,
issuer: str = "https://auth.example.com",
audience: str = "miem-mcp",
scope: str = "mcp:tools",
exp: int | None = None,
public_key=None,
):
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
claims = {
"iss": issuer,
"aud": audience,
"scope": scope,
"sub": "mcp-client",
"iat": int(time.time()),
"exp": exp or int(time.time()) + 300,
}
token = jwt.encode(claims, private_key, algorithm="RS256", headers={"kid": "test-key"})
return public_key or private_key.public_key(), token
def override_db():
session = Session()
try:
yield session
finally:
session.close()
calls = []
def fake_refresh_employee(db, refreshed_employee, route_settings):
calls.append((refreshed_employee.id, route_settings))
return SimpleNamespace(status="completed")
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: settings
monkeypatch.setattr("app.admin.refresh_employee", fake_refresh_employee)
client = TestClient(app)
client.cookies.set(SESSION_COOKIE, sign_session("admin", settings))
response = client.post(f"/admin/employees/{employee_id}/refresh", follow_redirects=False)
assert response.status_code == 303
assert response.headers["location"] == f"/admin/employees/{employee_id}?refresh_status=success"
assert calls == [(employee_id, settings)]
app.dependency_overrides.clear()

View File

@@ -1,6 +1,3 @@
import pytest
from pydantic import ValidationError
from app.config import Settings
@@ -14,8 +11,3 @@ def test_numeric_crawl_limit_is_parsed():
settings = Settings(crawl_limit="25")
assert settings.crawl_limit == 25
def test_mcp_auth_mode_rejects_oauth_or_token_fallback():
with pytest.raises(ValidationError):
Settings(mcp_auth_mode="oauth_or_token")

View File

@@ -1,7 +1,9 @@
import gzip
from datetime import datetime, timezone
from app.models import CrawlRun, CrawlRunEmployeeChange, Employee
from app.services.crawler import _mark_dismissed, _upsert_employee
from app.models import CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParseResourceCache
from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee
from app.services.resource_cache import ResourceCache
class FakeResponse:
@@ -17,6 +19,34 @@ class FakeSession:
return FakeResponse(self.statuses[url])
class ConditionalResponse:
def __init__(self, status_code, text="", headers=None):
self.status_code = status_code
self._text = text
self.headers = headers or {}
self.text_read = False
@property
def text(self):
self.text_read = True
return self._text
def raise_for_status(self):
return None
class ConditionalSession:
def __init__(self):
self.requests = []
self.not_modified_response = ConditionalResponse(304)
def get(self, url, **kwargs):
self.requests.append((url, kwargs))
if kwargs["headers"].get("If-None-Match") == '"cached"':
return self.not_modified_response
return ConditionalResponse(200, "fresh", {"ETag": '"fresh"'})
def test_mark_dismissed_records_missing_source_when_profile_is_available(db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
@@ -111,3 +141,86 @@ def test_upsert_employee_increments_new_count_and_records_change_for_new_employe
change = db_session.query(CrawlRunEmployeeChange).one()
assert change.change_type == "new"
assert change.full_name == "New Person"
def test_resource_cache_uses_etag_and_reuses_cached_body_on_304(db_session):
db_session.add(
ParseResourceCache(
profile_key="staff:cached",
resource_key="main-html",
method="GET",
url="https://www.hse.ru/staff/cached",
request_fingerprint="020d59db7b358d9023d0f185bcbf5a9c085d3cf2bf91d92d48eee9147e8d0f01",
etag='"cached"',
body_hash="cached-hash",
body_snapshot=gzip.compress("cached body".encode("utf-8")),
parser_version="0.6.0",
)
)
db_session.commit()
session = ConditionalSession()
result = ResourceCache(db_session).fetch_text(
session,
profile_key="staff:cached",
resource_key="main-html",
method="GET",
url="https://www.hse.ru/staff/cached",
headers={"User-Agent": "test"},
timeout=10,
)
assert session.requests[0][1]["headers"]["If-None-Match"] == '"cached"'
assert result.text == "cached body"
assert result.from_cache is True
assert session.not_modified_response.text_read is False
def test_upsert_employee_skips_snapshot_when_checksum_is_unchanged(db_session):
first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add_all([first_run, second_run])
db_session.commit()
_, first_changed = _upsert_employee(db_session, first_run, _parsed_employee("same"))
_, second_changed = _upsert_employee(db_session, second_run, _parsed_employee("same"))
db_session.commit()
assert first_changed is True
assert second_changed is False
assert db_session.query(EmployeeSnapshot).count() == 1
def test_checksum_changes_when_widget_data_changes():
base = _parsed_employee("widgets")
changed = _parsed_employee("widgets")
changed["sections"] = [
{
"type": "publications",
"publications": [{"id": "1", "title": "New publication"}],
}
]
assert _checksum(base) != _checksum(changed)
def test_checksum_ignores_date_dependent_experience_text():
first = _parsed_employee("experience")
second = _parsed_employee("experience")
first["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 5 лет"}]
second["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 6 лет"}]
assert _checksum(first) == _checksum(second)
def _parsed_employee(profile_id: str) -> dict:
return {
"source_url": f"https://www.hse.ru/staff/{profile_id}",
"profile_type": "staff",
"profile_id": profile_id,
"full_name": "Same Person",
"tabs": [],
"sections": [],
"parser_version": "0.6.0",
"_html": "<html></html>",
}

View File

@@ -0,0 +1,88 @@
from datetime import datetime, timezone
from app.models import Employee
from app.services.dataset_versions import get_or_create_current_version, sync_employees_payload
def _employee(profile_key: str, checksum: str, *, status: str = "active") -> Employee:
return Employee(
profile_key=profile_key,
profile_type=profile_key.split(":", 1)[0],
profile_id=profile_key.split(":", 1)[1],
canonical_url=f"https://www.hse.ru/{profile_key}",
full_name=profile_key,
status=status,
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
current_data={"profile_key": profile_key},
current_checksum=checksum,
)
def test_dataset_version_hash_is_stable_for_same_employee_state(db_session):
db_session.add(_employee("staff:alpha", "a" * 64))
db_session.commit()
first = get_or_create_current_version(db_session)
db_session.commit()
second = get_or_create_current_version(db_session)
assert second.id == first.id
assert second.hash == first.hash
assert second.employee_count == 1
def test_dataset_version_hash_changes_when_employee_checksum_changes(db_session):
employee = _employee("staff:alpha", "a" * 64)
db_session.add(employee)
db_session.commit()
first = get_or_create_current_version(db_session)
db_session.commit()
employee.current_checksum = "b" * 64
db_session.commit()
second = get_or_create_current_version(db_session)
assert second.hash != first.hash
assert second.previous_hash == first.hash
def test_sync_employees_diff_spans_multiple_intermediate_versions(db_session):
alpha = _employee("staff:alpha", "a" * 64)
db_session.add(alpha)
db_session.commit()
first = get_or_create_current_version(db_session)
db_session.commit()
beta = _employee("staff:beta", "b" * 64)
db_session.add(beta)
db_session.commit()
get_or_create_current_version(db_session)
db_session.commit()
alpha.current_checksum = "c" * 64
alpha.current_data = {"profile_key": "staff:alpha", "changed": True}
db_session.commit()
payload = sync_employees_payload(db_session, client_hash=first.hash, include_data=False)
assert payload["mode"] == "delta"
assert [item["profile_key"] for item in payload["changes"]["added"]] == ["staff:beta"]
assert [item["profile_key"] for item in payload["changes"]["updated"]] == ["staff:alpha"]
assert payload["changes"]["dismissed"] == []
assert payload["changes"]["removed"] == []
def test_sync_employees_reports_dismissed_as_tombstone(db_session):
alpha = _employee("staff:alpha", "a" * 64)
db_session.add(alpha)
db_session.commit()
first = get_or_create_current_version(db_session)
db_session.commit()
alpha.status = "dismissed"
db_session.commit()
payload = sync_employees_payload(db_session, client_hash=first.hash, include_data=False)
assert payload["changes"]["dismissed"][0]["profile_key"] == "staff:alpha"
assert payload["changes"]["dismissed"][0]["status"] == "dismissed"

View File

@@ -27,4 +27,6 @@ def test_employee_detail_template_is_human_readable():
assert "Дата увольнения" in template
assert "Тип профиля" in template
assert "ID профиля" in template
assert "Обновить данные" in template
assert 'action="/admin/employees/{{ employee.id }}/refresh"' in template
assert "Снапшоты" in template

View File

@@ -1,6 +1,6 @@
from bs4 import BeautifulSoup
from app.parser.profile import enrich_sections_from_hse_widgets, extract_person_tabs
from app.parser.profile import enrich_sections_from_hse_widgets, extract_person_tabs, extract_sections
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
@@ -64,6 +64,47 @@ class FakeSession:
)
class GroupedPublicationsSession(FakeSession):
def post(self, url, **kwargs):
self.posts.append((url, kwargs))
return FakeResponse(
{
"status": "ok",
"result": {
"more": False,
"total": 1,
"groupType": 2,
"items": {
"year": {
"header": {"ru": "по году", "en": "by year"},
"criteria": {"year": []},
"items": {
"2011": [
{
"id": "146366790",
"type": "ARTICLE",
"title": "Развитие теории самосогласованного поля",
"year": 2011,
"description": {"short": {"ru": "Журнал физической химии 2011."}},
}
],
"2012": [
{
"id": "146367323",
"type": "ARTICLE",
"title": "Self-consistent field theory investigation",
"year": 2012,
"description": {"short": {"en": "Russian Journal of Physical Chemistry A 2012."}},
}
],
},
}
},
},
}
)
def test_normalize_profile_url_supports_staff_and_org_persons():
assert normalize_profile_url("/staff/avsergeev#sci") == "https://www.hse.ru/staff/avsergeev"
assert normalize_profile_url("https://www.hse.ru/org/persons/123/") == "https://www.hse.ru/org/persons/123"
@@ -117,3 +158,60 @@ def test_enrich_sections_from_hse_widgets_loads_publications_and_vkr():
assert theses["theses"][0]["project_url"] == "https://www.hse.ru/edu/vkr/1045750164"
assert session.posts[0][0] == "https://publications.hse.ru/api/searchPubs"
assert session.gets[0][1]["params"] == {"supervisorId": "803294906"}
def test_enrich_sections_from_hse_widgets_loads_grouped_publications():
soup = BeautifulSoup(
"""
<script src="/n/stat/publications/dist-w/publs.js" data-author="133709486" data-widget-name="AuthorSearch"></script>
""",
"html.parser",
)
session = GroupedPublicationsSession()
sections = enrich_sections_from_hse_widgets(
session,
soup,
"https://www.hse.ru/org/persons/133709486",
{"User-Agent": "test"},
10,
[],
)
publications = next(section for section in sections if section["type"] == "publications")
assert publications["publications_count"] == 2
assert [item["id"] for item in publications["publications"]] == ["146366790", "146367323"]
assert publications["publications"][0]["url"] == "https://publications.hse.ru/view/146366790"
assert publications["publications"][1]["url"] == "https://publications.hse.ru/view/146367323"
def test_news_heading_with_publications_word_does_not_absorb_widget_publications():
soup = BeautifulSoup(
"""
<h2>Статья профессора МИЭМ вошла в число самых популярных публикаций на портале SpringerLink</h2>
<div class="post__text">
<p>Первоначально статья профессора вышла в российском журнале.</p>
</div>
<script src="/n/stat/publications/dist-w/publs.js" data-author="133709486" data-widget-name="AuthorSearch"></script>
""",
"html.parser",
)
session = FakeSession()
sections = extract_sections(soup, "https://www.hse.ru/org/persons/133709486")
sections = enrich_sections_from_hse_widgets(
session,
soup,
"https://www.hse.ru/org/persons/133709486",
{"User-Agent": "test"},
10,
sections,
)
assert sections[0]["type"] == "paragraphs"
assert sections[0]["title"].startswith("Статья профессора")
publications = [section for section in sections if section["type"] == "publications"]
assert len(publications) == 1
assert publications[0]["title"] == "Публикации и исследования"
assert publications[0]["publications_count"] == 1