feat: add detailed employee publications storage and MCP docs

This commit is contained in:
Anton
2026-05-15 17:39:41 +03:00
parent 2819a6c334
commit dbaf3af468
14 changed files with 677 additions and 26 deletions

View File

@@ -6,11 +6,20 @@ import time
from datetime import datetime, timezone
import requests
from sqlalchemy import select
from sqlalchemy import inspect, select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab
from app.models import (
CrawlError,
CrawlRun,
CrawlRunEmployeeChange,
Employee,
EmployeePublication,
EmployeeSnapshot,
ParserSource,
ProfileTab,
)
from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key
@@ -219,9 +228,127 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee
parser_version=parser_version,
)
)
db.flush()
_try_sync_employee_publications(db, run, employee, parsed)
return employee, changed
def _try_sync_employee_publications(db: Session, run: CrawlRun, employee: Employee, parsed: dict) -> None:
try:
if not _publication_payloads(parsed):
return
if not _employee_publications_table_exists(db):
return
with db.begin_nested():
_sync_employee_publications(db, employee, parsed)
except Exception as exc:
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=employee.canonical_url,
error_type=type(exc).__name__,
message=f"Не удалось сохранить публикации сотрудника: {exc}",
)
)
def _employee_publications_table_exists(db: Session) -> bool:
return inspect(db.connection()).has_table(EmployeePublication.__tablename__)
def _sync_employee_publications(db: Session, employee: Employee, parsed: dict) -> None:
publications = _publication_payloads(parsed)
seen_hashes = set()
for publication in publications:
source_hash = _publication_hash(publication)
seen_hashes.add(source_hash)
publication_id = _clean_optional(publication.get("publication_id") or publication.get("id"))
existing = None
if publication_id:
existing = db.scalar(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.publication_id == publication_id,
)
)
if not existing:
existing = db.scalar(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.source_hash == source_hash,
)
)
if not existing:
existing = EmployeePublication(employee_id=employee.id, source_hash=source_hash, title=_publication_title(publication))
db.add(existing)
_apply_publication(existing, publication, source_hash)
if seen_hashes:
stale = db.scalars(
select(EmployeePublication).where(
EmployeePublication.employee_id == employee.id,
EmployeePublication.source_hash.not_in(seen_hashes),
)
).all()
for item in stale:
db.delete(item)
def _publication_payloads(parsed: dict) -> list[dict]:
publications = []
for section in parsed.get("sections") or []:
if not isinstance(section, dict) or section.get("type") != "publications":
continue
for publication in section.get("publications") or []:
if isinstance(publication, dict):
publications.append(publication)
return publications
def _apply_publication(target: EmployeePublication, publication: dict, source_hash: str) -> None:
target.publication_id = _clean_optional(publication.get("publication_id") or publication.get("id"))
target.title = _publication_title(publication)
target.year = _int_or_none(publication.get("year"))
target.publication_type = _clean_optional(publication.get("publication_type") or publication.get("type"))
target.language = _clean_optional(publication.get("language"))
target.status = _int_or_none(publication.get("status"))
target.url = _clean_optional(publication.get("url"))
target.doi_url = _clean_optional(publication.get("doi_url"))
target.other_url = _clean_optional(publication.get("other_url"))
target.document_url = _clean_optional(publication.get("document_url"))
target.citation_text = _clean_optional(publication.get("citation_text") or publication.get("text"))
target.annotation = publication.get("annotation") if isinstance(publication.get("annotation"), dict) else None
target.description = publication.get("description") if isinstance(publication.get("description"), dict) else None
target.authors = publication.get("authors") if isinstance(publication.get("authors"), list) else None
target.raw_data = publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication
target.source_hash = source_hash
def _publication_hash(publication: dict) -> str:
return _payload_hash(publication.get("raw_data") if isinstance(publication.get("raw_data"), dict) else publication)
def _payload_hash(value: object) -> str:
payload = json.dumps(_stable_checksum_payload(value), ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _publication_title(publication: dict) -> str:
return _clean_optional(publication.get("title") or publication.get("text") or publication.get("id")) or "Untitled publication"
def _clean_optional(value: object) -> str | None:
text = str(value or "").strip()
return text or None
def _int_or_none(value: object) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
dismissed = 0
active = db.scalars(select(Employee).where(Employee.status == "active")).all()