Files
miem_workers/app/parser/profile.py

628 lines
23 KiB
Python

import re
from urllib.parse import urljoin
from bs4 import BeautifulSoup, NavigableString, Tag
from requests import Session
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
from app.version import BACKEND_VERSION
_YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE)
_EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})")
_PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE)
def normalize_ws(value: str | None) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]:
selectors = (
"div.person-menu.is-desktop.small.person-menu-addition",
".person-menu",
)
for selector in selectors:
menu = soup.select_one(selector)
if not menu:
continue
tabs = []
for anchor in menu.select("a[href]"):
title = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if title and href:
tabs.append(
{
"data_index": anchor.get("data-index"),
"title": title,
"href": urljoin(source_url, href),
}
)
if tabs:
return _dedupe_tabs(tabs)
return []
def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict:
name_node = soup.select_one("h1.person-caption") or soup.find("h1")
text = normalize_ws(soup.get_text(" ", strip=True))
year_match = _YEAR_PATTERN.search(text)
contacts = {"phones": [], "emails": [], "address": None, "items": []}
for email in _EMAIL_PATTERN.findall(text):
if email not in contacts["emails"]:
contacts["emails"].append(email)
for phone in _PHONE_PATTERN.findall(text):
normalized_phone = normalize_ws(phone)
if normalized_phone and normalized_phone not in contacts["phones"]:
contacts["phones"].append(normalized_phone)
address_match = re.search(
r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)",
text,
flags=re.IGNORECASE,
)
if address_match:
contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",")
positions = []
for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"):
value = normalize_ws(li.get_text(" ", strip=True))
if value:
positions.append(value)
external_ids = []
id_domains = (
("ORCID", "orcid.org"),
("Scopus AuthorID", "scopus.com"),
("ResearcherID", "webofscience.com"),
("Google Scholar", "scholar.google."),
("SPIN РИНЦ", "elibrary.ru"),
)
for anchor in soup.select("a[href]"):
href = anchor.get("href", "").strip()
label = normalize_ws(anchor.get_text(" ", strip=True))
for system, marker in id_domains:
if marker in href:
external_ids.append({"system": system, "value": label or system, "url": href})
break
return {
"source_url": source_url,
"full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None,
"positions": positions,
"hse_start_year": int(year_match.group(1)) if year_match else None,
"contacts": contacts,
"external_ids": _dedupe_dicts(external_ids),
}
def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]:
sections = []
for h2 in soup.select("h2"):
title = normalize_ws(h2.get_text(" ", strip=True))
if not title or "расписание занятий" in title.lower():
continue
nodes = _collect_between_h2(h2)
raw_text = _nodes_raw_text(nodes)
paragraphs = _nodes_paragraphs(nodes)
items = _nodes_list_items(nodes)
links = []
for node in nodes:
if isinstance(node, Tag):
links.extend(_extract_links(node, source_url))
section_type = _infer_section_type(title, nodes)
section = {
"title": title,
"slug": _slugify(title),
"type": section_type,
"raw_text": raw_text,
"paragraphs": paragraphs,
"items": items,
"links": links,
}
if section_type == "publications":
section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url)
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
elif section_type == "courses_by_year":
section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url)
section.pop("items", None)
section.pop("links", None)
elif section_type == "table":
section["table"] = _parse_table(nodes, source_url)
elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower():
section["items"] = _parse_vkr_items(nodes)
year_entries = _parse_year_entries(nodes, source_url)
if year_entries:
section["year_entries"] = year_entries
if section_type in {"generic", "paragraphs"}:
section["type"] = "year_blocks"
sections.append(section)
return sections
def parse_person_profile(
session: Session,
source_url: str,
headers: dict[str, str],
timeout: int,
use_playwright: bool = False,
) -> dict | None:
normalized_url = normalize_profile_url(source_url)
if not normalized_url:
return None
response = session.get(normalized_url, headers=headers, timeout=timeout)
response.raise_for_status()
html = response.text
if use_playwright:
html = _render_with_playwright(normalized_url, html)
soup = BeautifulSoup(html, "html.parser")
profile_type, profile_id = parse_profile_identity(normalized_url)
header = extract_person_header(soup, normalized_url)
tabs = extract_person_tabs(soup, normalized_url)
sections = extract_sections(soup, normalized_url)
sections = enrich_sections_from_hse_widgets(session, soup, normalized_url, headers, timeout, sections)
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
return {
"source_url": normalized_url,
"profile_type": profile_type,
"profile_id": profile_id,
"full_name": header.get("full_name"),
"positions": header.get("positions") or [],
"hse_start_year": header.get("hse_start_year"),
"contacts": header.get("contacts") or {},
"external_ids": header.get("external_ids") or [],
"tabs": tabs,
"sections": sections,
"employee_internal_links": internal_links,
"parser_version": BACKEND_VERSION,
"_html": html,
}
def enrich_sections_from_hse_widgets(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
sections: list[dict],
) -> list[dict]:
enriched = list(sections)
publications = _load_widget_publications(session, soup, headers, timeout)
if publications:
enriched = _upsert_publications_section(enriched, publications)
theses = _load_widget_graduation_theses(session, soup, source_url, headers, timeout)
if theses:
enriched = _upsert_graduation_theses_section(enriched, theses)
return enriched
def _render_with_playwright(source_url: str, fallback_html: str) -> str:
try:
from playwright.sync_api import sync_playwright
except Exception:
return fallback_html
try:
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
page.goto(source_url, wait_until="domcontentloaded", timeout=45000)
for index in range(page.locator(".person-menu a").count()):
try:
page.locator(".person-menu a").nth(index).click(timeout=2500, force=True)
page.wait_for_timeout(450)
except Exception:
continue
html = page.content()
browser.close()
return html
except Exception:
return fallback_html
def _load_widget_publications(session: Session, soup: BeautifulSoup, headers: dict[str, str], timeout: int) -> list[dict]:
script = soup.select_one('script[data-widget-name="AuthorSearch"][data-author]')
if not script:
return []
author_id = normalize_ws(script.get("data-author"))
if not author_id:
return []
publications = []
page_id = 1
per_page = 100
while page_id <= 20:
payload = {
"type": "ANY",
"filterParams": (
f'"acceptLanguage":"ru"|"fullTextPublicEnabled": 1|'
f'"pubsAuthor": {author_id}|"widgetName": "AuthorSearch"'
),
"paginationParams": {
"publsSort": ["TITLE_ASC"],
"publsCount": per_page,
"pageId": page_id,
},
}
try:
response = session.post(
"https://publications.hse.ru/api/searchPubs",
json=payload,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return publications
result = data.get("result") if isinstance(data, dict) else {}
items = _extract_publication_items(result)
if not items:
break
publications.extend(_normalize_publication_item(item) for item in items)
total = int(result.get("total") or 0)
if not result.get("more") and len(publications) >= total:
break
page_id += 1
return _dedupe_publications(publications)
def _extract_publication_items(result: object) -> list[dict]:
if not isinstance(result, dict):
return []
return _flatten_publication_items(result.get("items"))
def _flatten_publication_items(value: object) -> list[dict]:
if isinstance(value, list):
return [item for item in value if _is_publication_item(item)]
if not isinstance(value, dict):
return []
nested_items = value.get("items")
if isinstance(nested_items, list):
return [item for item in nested_items if _is_publication_item(item)]
if isinstance(nested_items, dict):
return _flatten_publication_items(nested_items)
publications = []
for child in value.values():
publications.extend(_flatten_publication_items(child))
return publications
def _is_publication_item(value: object) -> bool:
return isinstance(value, dict) and ("id" in value or "title" in value)
def _load_widget_graduation_theses(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
) -> list[dict]:
script = soup.select_one('script[src*="/n/stat/vkr/app.js"][data-person-id]')
if not script:
return []
person_id = normalize_ws(script.get("data-person-id"))
api_url = normalize_ws(script.get("data-api-url")) or "/n/vkr/api/"
if not person_id:
return []
request_headers = {**headers, "x-portal-language": "ru"}
try:
response = session.get(
urljoin(source_url, api_url),
params={"supervisorId": person_id},
headers=request_headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return []
items = data.get("data") if isinstance(data, dict) else []
if not isinstance(items, list):
return []
return [_normalize_vkr_item(item, source_url) for item in items if isinstance(item, dict)]
def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]:
nodes = []
for sibling in start_h2.next_siblings:
if isinstance(sibling, Tag) and sibling.name == "h2":
break
if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)):
continue
nodes.append(sibling)
return nodes
def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]:
links = []
for anchor in node.select("a[href]"):
text = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if text and href and "timetable" not in href.lower() and "расписание" not in text.lower():
links.append({"text": text, "url": urljoin(source_url, href)})
return links
def _nodes_raw_text(nodes: list) -> str:
chunks = []
for node in nodes:
text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node))
if text:
chunks.append(text)
return "\n".join(chunks)
def _nodes_paragraphs(nodes: list) -> list[str]:
paragraphs = []
for node in nodes:
if isinstance(node, Tag):
paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p"))
return [p for p in paragraphs if p]
def _nodes_list_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in items if item and "расписание" not in item.lower()]
def _infer_section_type(title: str, nodes: list) -> str:
lowered = title.lower()
if _has_table(nodes):
return "table"
if "публикац" in lowered:
return "publications"
if "учебные курсы" in lowered:
return "courses_by_year"
if _nodes_list_items(nodes):
return "list"
if _nodes_paragraphs(nodes):
return "paragraphs"
return "generic"
def _has_table(nodes: list) -> bool:
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
def _parse_table(nodes: list, source_url: str) -> dict:
for node in nodes:
if not isinstance(node, Tag):
continue
table = node if node.name == "table" else node.find("table")
if not table:
continue
headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")]
rows = []
for tr in table.select("tr"):
cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")]
if cells:
link = tr.find("a", href=True)
rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None})
return {"headers": headers, "rows": rows}
return {"headers": [], "rows": []}
def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]:
count_match = re.search(r"(\d+)\s*$", title)
publications = []
for node in nodes:
if not isinstance(node, Tag):
continue
for li in node.select("li"):
text = normalize_ws(li.get_text(" ", strip=True))
anchor = li.find("a", href=True)
if text:
publications.append(
{
"title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text,
"url": urljoin(source_url, anchor["href"]) if anchor else None,
"text": text,
}
)
if publications:
break
if not publications:
publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line]
return int(count_match.group(1)) if count_match else None, publications
def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]:
year_match = re.search(r"(\d{4}/\d{4})", title)
courses = []
for node in nodes:
if isinstance(node, Tag):
for li in node.select("li"):
anchor = li.find("a", href=True)
course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True))
if course_title:
courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None})
return year_match.group(1) if year_match else None, _dedupe_dicts(courses)
def _parse_year_entries(nodes: list, source_url: str) -> list[dict]:
entries = []
for node in nodes:
if not isinstance(node, Tag):
continue
for year_node in node.select(".person-list-hangover"):
year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True))
parent = year_node.parent
if parent:
entries.append(
{
"year": int(year_match.group(1)) if year_match else None,
"text": normalize_ws(parent.get_text(" ", strip=True)),
"links": _extract_links(parent, source_url),
}
)
return entries
def _parse_vkr_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in dict.fromkeys(items) if item]
def _normalize_publication_item(item: dict) -> dict:
publication_id = str(item.get("id") or "").strip()
title = _html_to_text(item.get("title"))
year = item.get("year")
publication_type = str(item.get("type") or "").strip() or None
description = item.get("description") if isinstance(item.get("description"), dict) else {}
short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft"))
text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part))
return {
"id": publication_id or None,
"title": title or publication_id,
"year": year,
"type": publication_type,
"url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None,
"text": text or title or publication_id,
}
def _normalize_vkr_item(item: dict, source_url: str) -> dict:
thesis_id = item.get("id")
program = item.get("learnProgram") if isinstance(item.get("learnProgram"), dict) else {}
org_unit = item.get("orgUnit") if isinstance(item.get("orgUnit"), dict) else {}
supervisors = []
for supervisor in item.get("supervisors") or []:
if not isinstance(supervisor, dict):
continue
name = normalize_ws(supervisor.get("name"))
url = normalize_ws(supervisor.get("url"))
if name or url:
supervisors.append({"name": name or url, "url": url or None})
return {
"id": thesis_id,
"student": normalize_ws(item.get("student")),
"title": normalize_ws(item.get("title")),
"defense_year": item.get("year"),
"level": normalize_ws(item.get("level")),
"rating": item.get("rating"),
"project_url": urljoin(source_url, f"/edu/vkr/{thesis_id}") if thesis_id else None,
"program": normalize_ws(program.get("title")),
"program_url": urljoin(source_url, program.get("url")) if program.get("url") else None,
"org_unit": normalize_ws(org_unit.get("title")),
"org_unit_url": urljoin(source_url, org_unit.get("url")) if org_unit.get("url") else None,
"supervisors": supervisors,
"text": normalize_ws(" ".join(str(part) for part in [item.get("student"), item.get("title"), item.get("year")] if part)),
}
def _upsert_publications_section(sections: list[dict], publications: list[dict]) -> list[dict]:
merged = []
inserted = False
for section in sections:
if section.get("type") != "publications":
merged.append(section)
continue
existing = section.get("publications") or []
section = {
**section,
"publications_count": max(section.get("publications_count") or 0, len(publications)),
"publications": _dedupe_publications([*existing, *publications]),
}
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
merged.append(section)
inserted = True
if not inserted:
merged.append(
{
"title": "Публикации и исследования",
"slug": "publikacii_i_issledovaniya",
"type": "publications",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in publications if item.get("text")],
"links": [],
"publications_count": len(publications),
"publications": publications,
}
)
return merged
def _upsert_graduation_theses_section(sections: list[dict], theses: list[dict]) -> list[dict]:
section = {
"title": "Выпускные квалификационные работы студентов НИУ ВШЭ",
"slug": "vypusknye_kvalifikacionnye_raboty_studentov_niu_vshe",
"type": "graduation_theses",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in theses if item.get("text")],
"links": [{"text": item["title"], "url": item["project_url"]} for item in theses if item.get("title") and item.get("project_url")],
"theses_count": len(theses),
"theses": theses,
}
return [item for item in sections if item.get("type") != "graduation_theses"] + [section]
def _dedupe_publications(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = item.get("id") or item.get("url") or item.get("title")
if key and key not in seen:
seen.add(key)
unique.append(item)
return unique
def _html_to_text(value: object) -> str:
return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True))
def _localized_value(value: object) -> str:
if isinstance(value, dict):
return normalize_ws(value.get("ru") or value.get("publ") or value.get("en"))
return normalize_ws(str(value or ""))
def _slugify(value: str) -> str:
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"
def _dedupe_tabs(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = (item.get("title"), item.get("href"))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _dedupe_dicts(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = tuple(sorted(item.items()))
if key not in seen:
seen.add(key)
unique.append(item)
return unique