import hashlib import json import re from urllib.parse import urljoin from bs4 import BeautifulSoup, NavigableString, Tag from requests import Session from app.parser.profile_url import normalize_profile_url, parse_profile_identity from app.version import BACKEND_VERSION _YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE) _EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})") _PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE) def normalize_ws(value: str | None) -> str: return re.sub(r"\s+", " ", value or "").strip() def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]: selectors = ( "div.person-menu.is-desktop.small.person-menu-addition", ".person-menu", ) for selector in selectors: menu = soup.select_one(selector) if not menu: continue tabs = [] for anchor in menu.select("a[href]"): title = normalize_ws(anchor.get_text(" ", strip=True)) href = anchor.get("href", "").strip() if title and href: tabs.append( { "data_index": anchor.get("data-index"), "title": title, "href": urljoin(source_url, href), } ) if tabs: return _dedupe_tabs(tabs) return [] def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict: name_node = soup.select_one("h1.person-caption") or soup.find("h1") text = normalize_ws(soup.get_text(" ", strip=True)) year_match = _YEAR_PATTERN.search(text) contacts = {"phones": [], "emails": [], "address": None, "items": []} for email in _EMAIL_PATTERN.findall(text): if email not in contacts["emails"]: contacts["emails"].append(email) for phone in _PHONE_PATTERN.findall(text): normalized_phone = normalize_ws(phone) if normalized_phone and normalized_phone not in contacts["phones"]: contacts["phones"].append(normalized_phone) address_match = re.search( r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)", text, flags=re.IGNORECASE, ) if address_match: contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",") positions = [] for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"): value = normalize_ws(li.get_text(" ", strip=True)) if value: positions.append(value) external_ids = [] id_domains = ( ("ORCID", "orcid.org"), ("Scopus AuthorID", "scopus.com"), ("ResearcherID", "webofscience.com"), ("Google Scholar", "scholar.google."), ("SPIN РИНЦ", "elibrary.ru"), ) for anchor in soup.select("a[href]"): href = anchor.get("href", "").strip() label = normalize_ws(anchor.get_text(" ", strip=True)) for system, marker in id_domains: if marker in href: external_ids.append({"system": system, "value": label or system, "url": href}) break return { "source_url": source_url, "full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None, "positions": positions, "hse_start_year": int(year_match.group(1)) if year_match else None, "contacts": contacts, "external_ids": _dedupe_dicts(external_ids), } def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]: sections = [] for h2 in soup.select("h2"): title = normalize_ws(h2.get_text(" ", strip=True)) if not title or "расписание занятий" in title.lower(): continue nodes = _collect_between_h2(h2) raw_text = _nodes_raw_text(nodes) paragraphs = _nodes_paragraphs(nodes) items = _nodes_list_items(nodes) links = [] for node in nodes: if isinstance(node, Tag): links.extend(_extract_links(node, source_url)) section_type = _infer_section_type(title, nodes) section = { "title": title, "slug": _slugify(title), "type": section_type, "raw_text": raw_text, "paragraphs": paragraphs, "items": items, "links": links, } if section_type == "publications": section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url) section["items"] = [item["text"] for item in section["publications"] if item.get("text")] elif section_type == "courses_by_year": section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url) section.pop("items", None) section.pop("links", None) elif section_type == "table": section["table"] = _parse_table(nodes, source_url) elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower(): section["items"] = _parse_vkr_items(nodes) year_entries = _parse_year_entries(nodes, source_url) if year_entries: section["year_entries"] = year_entries if section_type in {"generic", "paragraphs"}: section["type"] = "year_blocks" sections.append(section) return sections def parse_person_profile( session: Session, source_url: str, headers: dict[str, str], timeout: int, use_playwright: bool = False, resource_cache=None, ) -> dict | None: normalized_url = normalize_profile_url(source_url) if not normalized_url: return None profile_type, profile_id = parse_profile_identity(normalized_url) cache_profile_key = f"{profile_type}:{profile_id}" resource_manifest = [] html = _fetch_text( session, normalized_url, headers, timeout, resource_cache=resource_cache, profile_key=cache_profile_key, resource_key="main-html", resource_manifest=resource_manifest, ) if use_playwright: html = _render_with_playwright(normalized_url, html) soup = BeautifulSoup(html, "html.parser") header = extract_person_header(soup, normalized_url) tabs = extract_person_tabs(soup, normalized_url) sections = extract_sections(soup, normalized_url) sections = enrich_sections_from_hse_widgets( session, soup, normalized_url, headers, timeout, sections, resource_cache=resource_cache, profile_key=cache_profile_key, resource_manifest=resource_manifest, ) internal_links = [tab["href"] for tab in tabs if tab.get("href")] return { "source_url": normalized_url, "profile_type": profile_type, "profile_id": profile_id, "full_name": header.get("full_name"), "positions": header.get("positions") or [], "hse_start_year": header.get("hse_start_year"), "contacts": header.get("contacts") or {}, "external_ids": header.get("external_ids") or [], "tabs": tabs, "sections": sections, "employee_internal_links": internal_links, "parser_version": BACKEND_VERSION, "_html": html, "_resource_manifest": resource_manifest, } def enrich_sections_from_hse_widgets( session: Session, soup: BeautifulSoup, source_url: str, headers: dict[str, str], timeout: int, sections: list[dict], resource_cache=None, profile_key: str | None = None, resource_manifest: list[dict] | None = None, ) -> list[dict]: enriched = list(sections) publications = _load_widget_publications( session, soup, headers, timeout, resource_cache=resource_cache, profile_key=profile_key, resource_manifest=resource_manifest, ) if publications: enriched = _upsert_publications_section(enriched, publications) theses = _load_widget_graduation_theses( session, soup, source_url, headers, timeout, resource_cache=resource_cache, profile_key=profile_key, resource_manifest=resource_manifest, ) if theses: enriched = _upsert_graduation_theses_section(enriched, theses) return enriched def _render_with_playwright(source_url: str, fallback_html: str) -> str: try: from playwright.sync_api import sync_playwright except Exception: return fallback_html try: with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=True) page = browser.new_page() page.goto(source_url, wait_until="domcontentloaded", timeout=45000) for index in range(page.locator(".person-menu a").count()): try: page.locator(".person-menu a").nth(index).click(timeout=2500, force=True) page.wait_for_timeout(450) except Exception: continue html = page.content() browser.close() return html except Exception: return fallback_html def _load_widget_publications( session: Session, soup: BeautifulSoup, headers: dict[str, str], timeout: int, *, resource_cache=None, profile_key: str | None = None, resource_manifest: list[dict] | None = None, ) -> list[dict]: script = soup.select_one('script[data-widget-name="AuthorSearch"][data-author]') if not script: return [] author_id = normalize_ws(script.get("data-author")) if not author_id: return [] publications = [] page_id = 1 per_page = 100 while page_id <= 20: payload = { "type": "ANY", "filterParams": ( f'"acceptLanguage":"ru"|"fullTextPublicEnabled": 1|' f'"pubsAuthor": {author_id}|"widgetName": "AuthorSearch"' ), "paginationParams": { "publsSort": ["TITLE_ASC"], "publsCount": per_page, "pageId": page_id, }, } try: if resource_cache and profile_key: text = _fetch_text( session, "https://publications.hse.ru/api/searchPubs", headers, timeout, resource_cache=resource_cache, profile_key=profile_key, resource_key=f"publications-page-{page_id}", resource_manifest=resource_manifest, method="POST", json_payload=payload, ) data = json.loads(text) else: response = session.post( "https://publications.hse.ru/api/searchPubs", json=payload, headers=headers, timeout=timeout, ) response.raise_for_status() data = response.json() except Exception: return publications result = data.get("result") if isinstance(data, dict) else {} items = _extract_publication_items(result) if not items: break publications.extend(_normalize_publication_item(item, author_id) for item in items) total = int(result.get("total") or 0) if not result.get("more") and len(publications) >= total: break page_id += 1 return _dedupe_publications(publications) def _extract_publication_items(result: object) -> list[dict]: if not isinstance(result, dict): return [] return _flatten_publication_items(result.get("items")) def _flatten_publication_items(value: object) -> list[dict]: if isinstance(value, list): return [item for item in value if _is_publication_item(item)] if not isinstance(value, dict): return [] nested_items = value.get("items") if isinstance(nested_items, list): return [item for item in nested_items if _is_publication_item(item)] if isinstance(nested_items, dict): return _flatten_publication_items(nested_items) publications = [] for child in value.values(): publications.extend(_flatten_publication_items(child)) return publications def _is_publication_item(value: object) -> bool: return isinstance(value, dict) and ("id" in value or "title" in value) def _load_widget_graduation_theses( session: Session, soup: BeautifulSoup, source_url: str, headers: dict[str, str], timeout: int, *, resource_cache=None, profile_key: str | None = None, resource_manifest: list[dict] | None = None, ) -> list[dict]: script = soup.select_one('script[src*="/n/stat/vkr/app.js"][data-person-id]') if not script: return [] person_id = normalize_ws(script.get("data-person-id")) api_url = normalize_ws(script.get("data-api-url")) or "/n/vkr/api/" if not person_id: return [] request_headers = {**headers, "x-portal-language": "ru"} try: url = urljoin(source_url, api_url) params = {"supervisorId": person_id} if resource_cache and profile_key: text = _fetch_text( session, url, request_headers, timeout, resource_cache=resource_cache, profile_key=profile_key, resource_key="graduation-theses", resource_manifest=resource_manifest, params=params, ) data = json.loads(text) else: response = session.get( url, params=params, headers=request_headers, timeout=timeout, ) response.raise_for_status() data = response.json() except Exception: return [] items = data.get("data") if isinstance(data, dict) else [] if not isinstance(items, list): return [] return [_normalize_vkr_item(item, source_url) for item in items if isinstance(item, dict)] def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]: nodes = [] for sibling in start_h2.next_siblings: if isinstance(sibling, Tag) and sibling.name == "h2": break if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)): continue nodes.append(sibling) return nodes def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]: links = [] for anchor in node.select("a[href]"): text = normalize_ws(anchor.get_text(" ", strip=True)) href = anchor.get("href", "").strip() if text and href and "timetable" not in href.lower() and "расписание" not in text.lower(): links.append({"text": text, "url": urljoin(source_url, href)}) return links def _nodes_raw_text(nodes: list) -> str: chunks = [] for node in nodes: text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node)) if text: chunks.append(text) return "\n".join(chunks) def _nodes_paragraphs(nodes: list) -> list[str]: paragraphs = [] for node in nodes: if isinstance(node, Tag): paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p")) return [p for p in paragraphs if p] def _nodes_list_items(nodes: list) -> list[str]: items = [] for node in nodes: if isinstance(node, Tag): items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li")) return [item for item in items if item and "расписание" not in item.lower()] def _infer_section_type(title: str, nodes: list) -> str: lowered = title.lower() if _has_table(nodes): return "table" if _is_publications_title(lowered): return "publications" if "учебные курсы" in lowered: return "courses_by_year" if _nodes_list_items(nodes): return "list" if _nodes_paragraphs(nodes): return "paragraphs" return "generic" def _is_publications_title(lowered_title: str) -> bool: return lowered_title.startswith("публикац") def _has_table(nodes: list) -> bool: return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes) def _parse_table(nodes: list, source_url: str) -> dict: for node in nodes: if not isinstance(node, Tag): continue table = node if node.name == "table" else node.find("table") if not table: continue headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")] rows = [] for tr in table.select("tr"): cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")] if cells: link = tr.find("a", href=True) rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None}) return {"headers": headers, "rows": rows} return {"headers": [], "rows": []} def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]: count_match = re.search(r"(\d+)\s*$", title) publications = [] for node in nodes: if not isinstance(node, Tag): continue for li in node.select("li"): text = normalize_ws(li.get_text(" ", strip=True)) anchor = li.find("a", href=True) if text: publications.append( { "title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text, "url": urljoin(source_url, anchor["href"]) if anchor else None, "text": text, } ) if publications: break if not publications: publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line] return int(count_match.group(1)) if count_match else None, publications def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]: year_match = re.search(r"(\d{4}/\d{4})", title) courses = [] for node in nodes: if isinstance(node, Tag): for li in node.select("li"): anchor = li.find("a", href=True) course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True)) if course_title: courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None}) return year_match.group(1) if year_match else None, _dedupe_dicts(courses) def _parse_year_entries(nodes: list, source_url: str) -> list[dict]: entries = [] for node in nodes: if not isinstance(node, Tag): continue for year_node in node.select(".person-list-hangover"): year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True)) parent = year_node.parent if parent: entries.append( { "year": int(year_match.group(1)) if year_match else None, "text": normalize_ws(parent.get_text(" ", strip=True)), "links": _extract_links(parent, source_url), } ) return entries def _parse_vkr_items(nodes: list) -> list[str]: items = [] for node in nodes: if isinstance(node, Tag): items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li")) return [item for item in dict.fromkeys(items) if item] def _normalize_publication_item(item: dict, current_author_id: str | None = None) -> dict: publication_id = str(item.get("id") or "").strip() title = _html_to_text(item.get("title")) year = _int_or_none(item.get("year")) publication_type = str(item.get("type") or "").strip() or None description = item.get("description") if isinstance(item.get("description"), dict) else {} short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft")) documents = item.get("documents") if isinstance(item.get("documents"), dict) else {} language = item.get("language") if isinstance(item.get("language"), dict) else {} annotation = _localized_text_map(item.get("annotation")) authors = _normalize_publication_authors(item.get("authorsByType"), current_author_id) citation_text = normalize_ws(str(description.get("main") or "")) or _build_publication_citation(title, authors, year) text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part)) return { "id": publication_id or None, "publication_id": publication_id or None, "title": title or publication_id, "year": year, "type": publication_type, "publication_type": publication_type, "language": normalize_ws(language.get("name")) or None, "status": _int_or_none(item.get("status")), "url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None, "doi_url": _document_href(documents, "DOI"), "other_url": _document_href(documents, "OTHER_URL"), "document_url": _document_href(documents, "DOCUMENT"), "citation_text": citation_text or None, "annotation": annotation, "description": description or None, "authors": authors, "raw_data": item, "text": text or title or publication_id, } def _normalize_vkr_item(item: dict, source_url: str) -> dict: thesis_id = item.get("id") program = item.get("learnProgram") if isinstance(item.get("learnProgram"), dict) else {} org_unit = item.get("orgUnit") if isinstance(item.get("orgUnit"), dict) else {} supervisors = [] for supervisor in item.get("supervisors") or []: if not isinstance(supervisor, dict): continue name = normalize_ws(supervisor.get("name")) url = normalize_ws(supervisor.get("url")) if name or url: supervisors.append({"name": name or url, "url": url or None}) return { "id": thesis_id, "student": normalize_ws(item.get("student")), "title": normalize_ws(item.get("title")), "defense_year": item.get("year"), "level": normalize_ws(item.get("level")), "rating": item.get("rating"), "project_url": urljoin(source_url, f"/edu/vkr/{thesis_id}") if thesis_id else None, "program": normalize_ws(program.get("title")), "program_url": urljoin(source_url, program.get("url")) if program.get("url") else None, "org_unit": normalize_ws(org_unit.get("title")), "org_unit_url": urljoin(source_url, org_unit.get("url")) if org_unit.get("url") else None, "supervisors": supervisors, "text": normalize_ws(" ".join(str(part) for part in [item.get("student"), item.get("title"), item.get("year")] if part)), } def _upsert_publications_section(sections: list[dict], publications: list[dict]) -> list[dict]: merged = [] inserted = False for section in sections: if section.get("type") != "publications": merged.append(section) continue existing = section.get("publications") or [] section = { **section, "publications_count": max(section.get("publications_count") or 0, len(publications)), "publications": _dedupe_publications([*existing, *publications]), } section["items"] = [item["text"] for item in section["publications"] if item.get("text")] merged.append(section) inserted = True if not inserted: merged.append( { "title": "Публикации и исследования", "slug": "publikacii_i_issledovaniya", "type": "publications", "raw_text": "", "paragraphs": [], "items": [item["text"] for item in publications if item.get("text")], "links": [], "publications_count": len(publications), "publications": publications, } ) return merged def _upsert_graduation_theses_section(sections: list[dict], theses: list[dict]) -> list[dict]: section = { "title": "Выпускные квалификационные работы студентов НИУ ВШЭ", "slug": "vypusknye_kvalifikacionnye_raboty_studentov_niu_vshe", "type": "graduation_theses", "raw_text": "", "paragraphs": [], "items": [item["text"] for item in theses if item.get("text")], "links": [{"text": item["title"], "url": item["project_url"]} for item in theses if item.get("title") and item.get("project_url")], "theses_count": len(theses), "theses": theses, } return [item for item in sections if item.get("type") != "graduation_theses"] + [section] def _dedupe_publications(items: list[dict]) -> list[dict]: seen = set() unique = [] for item in items: key = item.get("id") or item.get("url") or item.get("title") if key and key not in seen: seen.add(key) unique.append(item) return unique def _html_to_text(value: object) -> str: return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True)) def _localized_text_map(value: object) -> dict[str, str]: if not isinstance(value, dict): return {} localized = {} for key in ("ru", "en", "publ"): text = _html_to_text(value.get(key)) if text: localized[key] = text return localized def _localized_value(value: object) -> str: if isinstance(value, dict): return normalize_ws(value.get("ru") or value.get("publ") or value.get("en")) return normalize_ws(str(value or "")) def _normalize_publication_authors(value: object, current_author_id: str | None) -> list[dict]: if not isinstance(value, dict): return [] authors = [] for author in value.get("author") or []: if not isinstance(author, dict): continue title = author.get("title") if isinstance(author.get("title"), dict) else {} reverse_title = author.get("reverseTitle") if isinstance(author.get("reverseTitle"), dict) else {} author_id = normalize_ws(author.get("id")) href = normalize_ws(author.get("href")) authors.append( { "id": author_id or None, "href": urljoin("https://www.hse.ru", href) if href else None, "title_ru": _html_to_text(title.get("ru")), "title_en": _html_to_text(title.get("en")), "reverse_title_ru": _html_to_text(reverse_title.get("ru")), "reverse_title_en": _html_to_text(reverse_title.get("en")), "alt_name": normalize_ws(author.get("altName")) or None, "other_name": normalize_ws(author.get("otherName")) or None, "is_current_employee": bool(current_author_id and author_id == current_author_id), } ) return authors def _document_href(documents: dict, key: str) -> str | None: document = documents.get(key) if not isinstance(document, dict): return None return normalize_ws(document.get("href")) or None def _build_publication_citation(title: str, authors: list[dict], year: int | None) -> str: author_names = [author.get("title_ru") or author.get("title_en") or author.get("alt_name") for author in authors] return normalize_ws(". ".join(part for part in [", ".join(filter(None, author_names)), title, str(year or "")] if part)) def _int_or_none(value: object) -> int | None: try: return int(value) except (TypeError, ValueError): return None def _slugify(value: str) -> str: cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE) return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section" def _dedupe_tabs(items: list[dict]) -> list[dict]: seen = set() unique = [] for item in items: key = (item.get("title"), item.get("href")) if key not in seen: seen.add(key) unique.append(item) return unique def _dedupe_dicts(items: list[dict]) -> list[dict]: seen = set() unique = [] for item in items: key = tuple(sorted(item.items())) if key not in seen: seen.add(key) unique.append(item) return unique def _fetch_text( session: Session, url: str, headers: dict[str, str], timeout: int, *, resource_cache=None, profile_key: str | None = None, resource_key: str, resource_manifest: list[dict] | None, method: str = "GET", json_payload: object | None = None, params: dict | None = None, ) -> str: if resource_cache and profile_key: cached = resource_cache.fetch_text( session, profile_key=profile_key, resource_key=resource_key, method=method, url=url, headers=headers, timeout=timeout, json_payload=json_payload, params=params, ) if resource_manifest is not None: resource_manifest.append( { "resource_key": resource_key, "method": method, "url": url, "body_hash": cached.body_hash, "from_cache": cached.from_cache, "status_code": cached.status_code, } ) return cached.text if method.upper() == "POST": response = session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params) else: response = session.get(url, headers=headers, timeout=timeout, params=params) response.raise_for_status() text = response.text if resource_manifest is not None: resource_manifest.append( { "resource_key": resource_key, "method": method, "url": url, "body_hash": hashlib.sha256(text.encode("utf-8")).hexdigest(), "from_cache": False, "status_code": response.status_code, } ) return text