Files
miem_workers/app/parser/profile.py

970 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import hashlib
import json
import re
from datetime import datetime, timezone
from urllib.parse import urljoin
from bs4 import BeautifulSoup, NavigableString, Tag
from requests import Session
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
from app.version import BACKEND_VERSION
_YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE)
_EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})")
_PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE)
def normalize_ws(value: str | None) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]:
selectors = (
"div.person-menu.is-desktop.small.person-menu-addition",
".person-menu",
)
for selector in selectors:
menu = soup.select_one(selector)
if not menu:
continue
tabs = []
for anchor in menu.select("a[href]"):
title = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if title and href:
tabs.append(
{
"data_index": anchor.get("data-index"),
"title": title,
"href": urljoin(source_url, href),
}
)
if tabs:
return _dedupe_tabs(tabs)
return []
def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict:
name_node = soup.select_one("h1.person-caption") or soup.find("h1")
text = normalize_ws(soup.get_text(" ", strip=True))
year_match = _YEAR_PATTERN.search(text)
contacts = {"phones": [], "emails": [], "address": None, "items": []}
for email in _EMAIL_PATTERN.findall(text):
if email not in contacts["emails"]:
contacts["emails"].append(email)
for phone in _PHONE_PATTERN.findall(text):
normalized_phone = normalize_ws(phone)
if normalized_phone and normalized_phone not in contacts["phones"]:
contacts["phones"].append(normalized_phone)
address_match = re.search(
r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)",
text,
flags=re.IGNORECASE,
)
if address_match:
contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",")
positions = []
for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"):
value = normalize_ws(li.get_text(" ", strip=True))
if value:
positions.append(value)
external_ids = []
id_domains = (
("ORCID", "orcid.org"),
("Scopus AuthorID", "scopus.com"),
("ResearcherID", "webofscience.com"),
("Google Scholar", "scholar.google."),
("SPIN РИНЦ", "elibrary.ru"),
)
for anchor in soup.select("a[href]"):
href = anchor.get("href", "").strip()
label = normalize_ws(anchor.get_text(" ", strip=True))
for system, marker in id_domains:
if marker in href:
external_ids.append({"system": system, "value": label or system, "url": href})
break
return {
"source_url": source_url,
"full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None,
"positions": positions,
"hse_start_year": int(year_match.group(1)) if year_match else None,
"contacts": contacts,
"external_ids": _dedupe_dicts(external_ids),
}
def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]:
sections = []
for h2 in soup.select("h2"):
if h2.find_parent(class_="post") or h2.find_parent(attrs={"data-tab": "press_links_news"}):
continue
title = normalize_ws(h2.get_text(" ", strip=True))
if not title or "расписание занятий" in title.lower():
continue
nodes = _collect_between_h2(h2)
raw_text = _nodes_raw_text(nodes)
paragraphs = _nodes_paragraphs(nodes)
items = _nodes_list_items(nodes)
links = []
for node in nodes:
if isinstance(node, Tag):
links.extend(_extract_links(node, source_url))
section_type = _infer_section_type(title, nodes)
section = {
"title": title,
"slug": _slugify(title),
"type": section_type,
"raw_text": raw_text,
"paragraphs": paragraphs,
"items": items,
"links": links,
}
if section_type == "publications":
section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url)
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
elif section_type == "courses_by_year":
section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url)
section.pop("items", None)
section.pop("links", None)
elif section_type == "table":
section["table"] = _parse_table(nodes, source_url)
elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower():
section["items"] = _parse_vkr_items(nodes)
year_entries = _parse_year_entries(nodes, source_url)
if year_entries:
section["year_entries"] = year_entries
if section_type in {"generic", "paragraphs"}:
section["type"] = "year_blocks"
sections.append(section)
news_links = _parse_news_links(soup, source_url)
if news_links:
sections.append(
{
"title": "В новостях",
"slug": "v_novostyah",
"type": "news",
"raw_text": "",
"paragraphs": [],
"items": [item["title"] for item in news_links if item.get("title")],
"links": [{"text": item["title"], "url": item["url"]} for item in news_links if item.get("title") and item.get("url")],
"news_count": len(news_links),
"news_links": news_links,
}
)
return sections
def parse_person_profile(
session: Session,
source_url: str,
headers: dict[str, str],
timeout: int,
use_playwright: bool = False,
resource_cache=None,
) -> dict | None:
normalized_url = normalize_profile_url(source_url)
if not normalized_url:
return None
profile_type, profile_id = parse_profile_identity(normalized_url)
cache_profile_key = f"{profile_type}:{profile_id}"
resource_manifest = []
html = _fetch_text(
session,
normalized_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_key="main-html",
resource_manifest=resource_manifest,
)
if use_playwright:
html = _render_with_playwright(normalized_url, html)
soup = BeautifulSoup(html, "html.parser")
header = extract_person_header(soup, normalized_url)
tabs = extract_person_tabs(soup, normalized_url)
sections = extract_sections(soup, normalized_url)
sections = enrich_sections_from_hse_widgets(
session,
soup,
normalized_url,
headers,
timeout,
sections,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_manifest=resource_manifest,
)
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
return {
"source_url": normalized_url,
"profile_type": profile_type,
"profile_id": profile_id,
"full_name": header.get("full_name"),
"positions": header.get("positions") or [],
"hse_start_year": header.get("hse_start_year"),
"contacts": header.get("contacts") or {},
"external_ids": header.get("external_ids") or [],
"tabs": tabs,
"sections": sections,
"employee_internal_links": internal_links,
"parser_version": BACKEND_VERSION,
"_html": html,
"_resource_manifest": resource_manifest,
}
def enrich_sections_from_hse_widgets(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
sections: list[dict],
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
enriched = list(sections)
publications = _load_widget_publications(
session,
soup,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if publications:
enriched = _upsert_publications_section(enriched, publications)
theses = _load_widget_graduation_theses(
session,
soup,
source_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if theses:
enriched = _upsert_graduation_theses_section(enriched, theses)
return enriched
def _render_with_playwright(source_url: str, fallback_html: str) -> str:
try:
from playwright.sync_api import sync_playwright
except Exception:
return fallback_html
try:
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
page.goto(source_url, wait_until="domcontentloaded", timeout=45000)
for index in range(page.locator(".person-menu a").count()):
try:
page.locator(".person-menu a").nth(index).click(timeout=2500, force=True)
page.wait_for_timeout(450)
except Exception:
continue
html = page.content()
browser.close()
return html
except Exception:
return fallback_html
def _load_widget_publications(
session: Session,
soup: BeautifulSoup,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[data-widget-name="AuthorSearch"][data-author]')
if not script:
return []
author_id = normalize_ws(script.get("data-author"))
if not author_id:
return []
publications = []
page_id = 1
per_page = 100
while page_id <= 20:
payload = {
"type": "ANY",
"filterParams": (
f'"acceptLanguage":"ru"|"fullTextPublicEnabled": 1|'
f'"pubsAuthor": {author_id}|"widgetName": "AuthorSearch"'
),
"paginationParams": {
"publsSort": ["TITLE_ASC"],
"publsCount": per_page,
"pageId": page_id,
},
}
try:
if resource_cache and profile_key:
text = _fetch_text(
session,
"https://publications.hse.ru/api/searchPubs",
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key=f"publications-page-{page_id}",
resource_manifest=resource_manifest,
method="POST",
json_payload=payload,
)
data = json.loads(text)
else:
response = session.post(
"https://publications.hse.ru/api/searchPubs",
json=payload,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return publications
result = data.get("result") if isinstance(data, dict) else {}
items = _extract_publication_items(result)
if not items:
break
publications.extend(_normalize_publication_item(item, author_id) for item in items)
total = int(result.get("total") or 0)
if not result.get("more") and len(publications) >= total:
break
page_id += 1
return _dedupe_publications(publications)
def _extract_publication_items(result: object) -> list[dict]:
if not isinstance(result, dict):
return []
return _flatten_publication_items(result.get("items"))
def _flatten_publication_items(value: object) -> list[dict]:
if isinstance(value, list):
return [item for item in value if _is_publication_item(item)]
if not isinstance(value, dict):
return []
nested_items = value.get("items")
if isinstance(nested_items, list):
return [item for item in nested_items if _is_publication_item(item)]
if isinstance(nested_items, dict):
return _flatten_publication_items(nested_items)
publications = []
for child in value.values():
publications.extend(_flatten_publication_items(child))
return publications
def _is_publication_item(value: object) -> bool:
return isinstance(value, dict) and ("id" in value or "title" in value)
def _load_widget_graduation_theses(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[src*="/n/stat/vkr/app.js"][data-person-id]')
if not script:
return []
person_id = normalize_ws(script.get("data-person-id"))
api_url = normalize_ws(script.get("data-api-url")) or "/n/vkr/api/"
if not person_id:
return []
request_headers = {**headers, "x-portal-language": "ru"}
try:
url = urljoin(source_url, api_url)
params = {"supervisorId": person_id}
if resource_cache and profile_key:
text = _fetch_text(
session,
url,
request_headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key="graduation-theses",
resource_manifest=resource_manifest,
params=params,
)
data = json.loads(text)
else:
response = session.get(
url,
params=params,
headers=request_headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return []
items = data.get("data") if isinstance(data, dict) else []
if not isinstance(items, list):
return []
return [_normalize_vkr_item(item, source_url) for item in items if isinstance(item, dict)]
def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]:
nodes = []
for sibling in start_h2.next_siblings:
if isinstance(sibling, Tag) and sibling.name == "h2":
break
if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)):
continue
nodes.append(sibling)
return nodes
def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]:
links = []
for anchor in node.select("a[href]"):
text = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if text and href and "timetable" not in href.lower() and "расписание" not in text.lower():
links.append({"text": text, "url": urljoin(source_url, href)})
return links
def _nodes_raw_text(nodes: list) -> str:
chunks = []
for node in nodes:
text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node))
if text:
chunks.append(text)
return "\n".join(chunks)
def _nodes_paragraphs(nodes: list) -> list[str]:
paragraphs = []
for node in nodes:
if isinstance(node, Tag):
paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p"))
return [p for p in paragraphs if p]
def _nodes_list_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in items if item and "расписание" not in item.lower()]
def _infer_section_type(title: str, nodes: list) -> str:
lowered = title.lower()
if _has_table(nodes):
return "table"
if _is_publications_title(lowered):
return "publications"
if "учебные курсы" in lowered:
return "courses_by_year"
if _nodes_list_items(nodes):
return "list"
if _nodes_paragraphs(nodes):
return "paragraphs"
return "generic"
def _is_publications_title(lowered_title: str) -> bool:
return lowered_title.startswith("публикац")
def _has_table(nodes: list) -> bool:
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
def _parse_table(nodes: list, source_url: str) -> dict:
for node in nodes:
if not isinstance(node, Tag):
continue
table = node if node.name == "table" else node.find("table")
if not table:
continue
headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")]
rows = []
for tr in table.select("tr"):
cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")]
if cells:
link = tr.find("a", href=True)
rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None})
return {"headers": headers, "rows": rows}
return {"headers": [], "rows": []}
def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]:
count_match = re.search(r"(\d+)\s*$", title)
publications = []
for node in nodes:
if not isinstance(node, Tag):
continue
for li in node.select("li"):
text = normalize_ws(li.get_text(" ", strip=True))
anchor = li.find("a", href=True)
if text:
publications.append(
{
"title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text,
"url": urljoin(source_url, anchor["href"]) if anchor else None,
"text": text,
}
)
if publications:
break
if not publications:
publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line]
return int(count_match.group(1)) if count_match else None, publications
def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]:
year_match = re.search(r"(\d{4}/\d{4})", title)
courses = []
for node in nodes:
if isinstance(node, Tag):
for li in node.select("li"):
anchor = li.find("a", href=True)
course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True))
if course_title:
courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None})
return year_match.group(1) if year_match else None, _dedupe_dicts(courses)
def _parse_year_entries(nodes: list, source_url: str) -> list[dict]:
entries = []
for node in nodes:
if not isinstance(node, Tag):
continue
for year_node in node.select(".person-list-hangover"):
year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True))
parent = year_node.parent
if parent:
entries.append(
{
"year": int(year_match.group(1)) if year_match else None,
"text": normalize_ws(parent.get_text(" ", strip=True)),
"links": _extract_links(parent, source_url),
}
)
return entries
def _parse_vkr_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in dict.fromkeys(items) if item]
def _parse_news_links(soup: BeautifulSoup, source_url: str) -> list[dict]:
news = []
for post in soup.select('[data-tab="press_links_news"] .post'):
if not isinstance(post, Tag):
continue
anchor = post.select_one(".post__content h2 a[href], h2 a[href], a[href]")
title = normalize_ws(anchor.get_text(" ", strip=True)) if anchor else ""
href = normalize_ws(anchor.get("href")) if anchor else ""
summary_node = post.select_one(".post__text")
summary = normalize_ws(summary_node.get_text(" ", strip=True)) if summary_node else ""
published_at = _parse_post_date(post)
if not title and not href:
continue
item = {
"title": title or href,
"url": urljoin(source_url, href) if href else None,
"summary": summary or None,
"published_at": published_at.isoformat() if published_at else None,
"published_year": published_at.year if published_at else _int_or_none(normalize_ws(_select_text(post, ".post-meta__year"))),
"raw_data": {
"title": title or href,
"url": href or None,
"summary": summary or None,
"date_text": normalize_ws(_select_text(post, ".post-meta__date")),
},
}
news.append(item)
return _dedupe_news_links(news)
def _select_text(node: Tag, selector: str) -> str:
selected = node.select_one(selector)
return selected.get_text(" ", strip=True) if selected else ""
def _parse_post_date(post: Tag) -> datetime | None:
day = _int_or_none(normalize_ws(_select_text(post, ".post-meta__day")))
month = _month_number(normalize_ws(_select_text(post, ".post-meta__month")))
year = _int_or_none(normalize_ws(_select_text(post, ".post-meta__year")))
if not day or not month or not year:
return None
try:
return datetime(year, month, day, tzinfo=timezone.utc)
except ValueError:
return None
def _month_number(value: str) -> int | None:
lowered = value.lower().strip(".")
months = {
"янв": 1,
"январь": 1,
"января": 1,
"фев": 2,
"февр": 2,
"февраль": 2,
"февраля": 2,
"март": 3,
"мар": 3,
"марта": 3,
"апр": 4,
"апрель": 4,
"апреля": 4,
"май": 5,
"мая": 5,
"июнь": 6,
"июня": 6,
"июль": 7,
"июля": 7,
"авг": 8,
"август": 8,
"августа": 8,
"сент": 9,
"сен": 9,
"сентябрь": 9,
"сентября": 9,
"окт": 10,
"октябрь": 10,
"октября": 10,
"нояб": 11,
"ноябрь": 11,
"ноября": 11,
"дек": 12,
"декабрь": 12,
"декабря": 12,
}
return months.get(lowered)
def _normalize_publication_item(item: dict, current_author_id: str | None = None) -> dict:
publication_id = str(item.get("id") or "").strip()
title = _html_to_text(item.get("title"))
year = _int_or_none(item.get("year"))
publication_type = str(item.get("type") or "").strip() or None
description = item.get("description") if isinstance(item.get("description"), dict) else {}
short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft"))
documents = item.get("documents") if isinstance(item.get("documents"), dict) else {}
language = item.get("language") if isinstance(item.get("language"), dict) else {}
annotation = _localized_text_map(item.get("annotation"))
authors = _normalize_publication_authors(item.get("authorsByType"), current_author_id)
citation_text = normalize_ws(str(description.get("main") or "")) or _build_publication_citation(title, authors, year)
text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part))
return {
"id": publication_id or None,
"publication_id": publication_id or None,
"title": title or publication_id,
"year": year,
"type": publication_type,
"publication_type": publication_type,
"language": normalize_ws(language.get("name")) or None,
"status": _int_or_none(item.get("status")),
"url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None,
"doi_url": _document_href(documents, "DOI"),
"other_url": _document_href(documents, "OTHER_URL"),
"document_url": _document_href(documents, "DOCUMENT"),
"citation_text": citation_text or None,
"annotation": annotation,
"description": description or None,
"authors": authors,
"raw_data": item,
"text": text or title or publication_id,
}
def _normalize_vkr_item(item: dict, source_url: str) -> dict:
thesis_id = item.get("id")
program = item.get("learnProgram") if isinstance(item.get("learnProgram"), dict) else {}
org_unit = item.get("orgUnit") if isinstance(item.get("orgUnit"), dict) else {}
supervisors = []
for supervisor in item.get("supervisors") or []:
if not isinstance(supervisor, dict):
continue
name = normalize_ws(supervisor.get("name"))
url = normalize_ws(supervisor.get("url"))
if name or url:
supervisors.append({"name": name or url, "url": url or None})
return {
"id": thesis_id,
"student": normalize_ws(item.get("student")),
"title": normalize_ws(item.get("title")),
"defense_year": item.get("year"),
"level": normalize_ws(item.get("level")),
"rating": item.get("rating"),
"project_url": urljoin(source_url, f"/edu/vkr/{thesis_id}") if thesis_id else None,
"program": normalize_ws(program.get("title")),
"program_url": urljoin(source_url, program.get("url")) if program.get("url") else None,
"org_unit": normalize_ws(org_unit.get("title")),
"org_unit_url": urljoin(source_url, org_unit.get("url")) if org_unit.get("url") else None,
"supervisors": supervisors,
"text": normalize_ws(" ".join(str(part) for part in [item.get("student"), item.get("title"), item.get("year")] if part)),
}
def _upsert_publications_section(sections: list[dict], publications: list[dict]) -> list[dict]:
merged = []
inserted = False
for section in sections:
if section.get("type") != "publications":
merged.append(section)
continue
existing = section.get("publications") or []
section = {
**section,
"publications_count": max(section.get("publications_count") or 0, len(publications)),
"publications": _dedupe_publications([*existing, *publications]),
}
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
merged.append(section)
inserted = True
if not inserted:
merged.append(
{
"title": "Публикации и исследования",
"slug": "publikacii_i_issledovaniya",
"type": "publications",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in publications if item.get("text")],
"links": [],
"publications_count": len(publications),
"publications": publications,
}
)
return merged
def _upsert_graduation_theses_section(sections: list[dict], theses: list[dict]) -> list[dict]:
section = {
"title": "Выпускные квалификационные работы студентов НИУ ВШЭ",
"slug": "vypusknye_kvalifikacionnye_raboty_studentov_niu_vshe",
"type": "graduation_theses",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in theses if item.get("text")],
"links": [{"text": item["title"], "url": item["project_url"]} for item in theses if item.get("title") and item.get("project_url")],
"theses_count": len(theses),
"theses": theses,
}
return [item for item in sections if item.get("type") != "graduation_theses"] + [section]
def _dedupe_publications(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = item.get("id") or item.get("url") or item.get("title")
if key and key not in seen:
seen.add(key)
unique.append(item)
return unique
def _dedupe_news_links(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = item.get("url") or item.get("title")
if key and key not in seen:
seen.add(key)
unique.append(item)
return unique
def _html_to_text(value: object) -> str:
return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True))
def _localized_text_map(value: object) -> dict[str, str]:
if not isinstance(value, dict):
return {}
localized = {}
for key in ("ru", "en", "publ"):
text = _html_to_text(value.get(key))
if text:
localized[key] = text
return localized
def _localized_value(value: object) -> str:
if isinstance(value, dict):
return normalize_ws(value.get("ru") or value.get("publ") or value.get("en"))
return normalize_ws(str(value or ""))
def _normalize_publication_authors(value: object, current_author_id: str | None) -> list[dict]:
if not isinstance(value, dict):
return []
authors = []
for author in value.get("author") or []:
if not isinstance(author, dict):
continue
title = author.get("title") if isinstance(author.get("title"), dict) else {}
reverse_title = author.get("reverseTitle") if isinstance(author.get("reverseTitle"), dict) else {}
author_id = normalize_ws(author.get("id"))
href = normalize_ws(author.get("href"))
authors.append(
{
"id": author_id or None,
"href": urljoin("https://www.hse.ru", href) if href else None,
"title_ru": _html_to_text(title.get("ru")),
"title_en": _html_to_text(title.get("en")),
"reverse_title_ru": _html_to_text(reverse_title.get("ru")),
"reverse_title_en": _html_to_text(reverse_title.get("en")),
"alt_name": normalize_ws(author.get("altName")) or None,
"other_name": normalize_ws(author.get("otherName")) or None,
"is_current_employee": bool(current_author_id and author_id == current_author_id),
}
)
return authors
def _document_href(documents: dict, key: str) -> str | None:
document = documents.get(key)
if not isinstance(document, dict):
return None
return normalize_ws(document.get("href")) or None
def _build_publication_citation(title: str, authors: list[dict], year: int | None) -> str:
author_names = [author.get("title_ru") or author.get("title_en") or author.get("alt_name") for author in authors]
return normalize_ws(". ".join(part for part in [", ".join(filter(None, author_names)), title, str(year or "")] if part))
def _int_or_none(value: object) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _slugify(value: str) -> str:
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"
def _dedupe_tabs(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = (item.get("title"), item.get("href"))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _dedupe_dicts(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = tuple(sorted(item.items()))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _fetch_text(
session: Session,
url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_key: str,
resource_manifest: list[dict] | None,
method: str = "GET",
json_payload: object | None = None,
params: dict | None = None,
) -> str:
if resource_cache and profile_key:
cached = resource_cache.fetch_text(
session,
profile_key=profile_key,
resource_key=resource_key,
method=method,
url=url,
headers=headers,
timeout=timeout,
json_payload=json_payload,
params=params,
)
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": cached.body_hash,
"from_cache": cached.from_cache,
"status_code": cached.status_code,
}
)
return cached.text
if method.upper() == "POST":
response = session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params)
else:
response = session.get(url, headers=headers, timeout=timeout, params=params)
response.raise_for_status()
text = response.text
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": hashlib.sha256(text.encode("utf-8")).hexdigest(),
"from_cache": False,
"status_code": response.status_code,
}
)
return text