381 lines
14 KiB
Python
381 lines
14 KiB
Python
import re
|
|
from urllib.parse import urljoin
|
|
|
|
from bs4 import BeautifulSoup, NavigableString, Tag
|
|
from requests import Session
|
|
|
|
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
|
|
from app.version import BACKEND_VERSION
|
|
|
|
_YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE)
|
|
_EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})")
|
|
_PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE)
|
|
|
|
|
|
def normalize_ws(value: str | None) -> str:
|
|
return re.sub(r"\s+", " ", value or "").strip()
|
|
|
|
|
|
def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]:
|
|
selectors = (
|
|
"div.person-menu.is-desktop.small.person-menu-addition",
|
|
".person-menu",
|
|
)
|
|
for selector in selectors:
|
|
menu = soup.select_one(selector)
|
|
if not menu:
|
|
continue
|
|
tabs = []
|
|
for anchor in menu.select("a[href]"):
|
|
title = normalize_ws(anchor.get_text(" ", strip=True))
|
|
href = anchor.get("href", "").strip()
|
|
if title and href:
|
|
tabs.append(
|
|
{
|
|
"data_index": anchor.get("data-index"),
|
|
"title": title,
|
|
"href": urljoin(source_url, href),
|
|
}
|
|
)
|
|
if tabs:
|
|
return _dedupe_tabs(tabs)
|
|
return []
|
|
|
|
|
|
def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict:
|
|
name_node = soup.select_one("h1.person-caption") or soup.find("h1")
|
|
text = normalize_ws(soup.get_text(" ", strip=True))
|
|
year_match = _YEAR_PATTERN.search(text)
|
|
contacts = {"phones": [], "emails": [], "address": None, "items": []}
|
|
|
|
for email in _EMAIL_PATTERN.findall(text):
|
|
if email not in contacts["emails"]:
|
|
contacts["emails"].append(email)
|
|
for phone in _PHONE_PATTERN.findall(text):
|
|
normalized_phone = normalize_ws(phone)
|
|
if normalized_phone and normalized_phone not in contacts["phones"]:
|
|
contacts["phones"].append(normalized_phone)
|
|
|
|
address_match = re.search(
|
|
r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)",
|
|
text,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
if address_match:
|
|
contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",")
|
|
|
|
positions = []
|
|
for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"):
|
|
value = normalize_ws(li.get_text(" ", strip=True))
|
|
if value:
|
|
positions.append(value)
|
|
|
|
external_ids = []
|
|
id_domains = (
|
|
("ORCID", "orcid.org"),
|
|
("Scopus AuthorID", "scopus.com"),
|
|
("ResearcherID", "webofscience.com"),
|
|
("Google Scholar", "scholar.google."),
|
|
("SPIN РИНЦ", "elibrary.ru"),
|
|
)
|
|
for anchor in soup.select("a[href]"):
|
|
href = anchor.get("href", "").strip()
|
|
label = normalize_ws(anchor.get_text(" ", strip=True))
|
|
for system, marker in id_domains:
|
|
if marker in href:
|
|
external_ids.append({"system": system, "value": label or system, "url": href})
|
|
break
|
|
|
|
return {
|
|
"source_url": source_url,
|
|
"full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None,
|
|
"positions": positions,
|
|
"hse_start_year": int(year_match.group(1)) if year_match else None,
|
|
"contacts": contacts,
|
|
"external_ids": _dedupe_dicts(external_ids),
|
|
}
|
|
|
|
|
|
def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]:
|
|
sections = []
|
|
for h2 in soup.select("h2"):
|
|
title = normalize_ws(h2.get_text(" ", strip=True))
|
|
if not title or "расписание занятий" in title.lower():
|
|
continue
|
|
nodes = _collect_between_h2(h2)
|
|
raw_text = _nodes_raw_text(nodes)
|
|
paragraphs = _nodes_paragraphs(nodes)
|
|
items = _nodes_list_items(nodes)
|
|
links = []
|
|
for node in nodes:
|
|
if isinstance(node, Tag):
|
|
links.extend(_extract_links(node, source_url))
|
|
|
|
section_type = _infer_section_type(title, nodes)
|
|
section = {
|
|
"title": title,
|
|
"slug": _slugify(title),
|
|
"type": section_type,
|
|
"raw_text": raw_text,
|
|
"paragraphs": paragraphs,
|
|
"items": items,
|
|
"links": links,
|
|
}
|
|
|
|
if section_type == "publications":
|
|
section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url)
|
|
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
|
|
elif section_type == "courses_by_year":
|
|
section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url)
|
|
section.pop("items", None)
|
|
section.pop("links", None)
|
|
elif section_type == "table":
|
|
section["table"] = _parse_table(nodes, source_url)
|
|
elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower():
|
|
section["items"] = _parse_vkr_items(nodes)
|
|
|
|
year_entries = _parse_year_entries(nodes, source_url)
|
|
if year_entries:
|
|
section["year_entries"] = year_entries
|
|
if section_type in {"generic", "paragraphs"}:
|
|
section["type"] = "year_blocks"
|
|
sections.append(section)
|
|
return sections
|
|
|
|
|
|
def parse_person_profile(
|
|
session: Session,
|
|
source_url: str,
|
|
headers: dict[str, str],
|
|
timeout: int,
|
|
use_playwright: bool = False,
|
|
) -> dict | None:
|
|
normalized_url = normalize_profile_url(source_url)
|
|
if not normalized_url:
|
|
return None
|
|
response = session.get(normalized_url, headers=headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
html = response.text
|
|
if use_playwright:
|
|
html = _render_with_playwright(normalized_url, html)
|
|
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
profile_type, profile_id = parse_profile_identity(normalized_url)
|
|
header = extract_person_header(soup, normalized_url)
|
|
tabs = extract_person_tabs(soup, normalized_url)
|
|
sections = extract_sections(soup, normalized_url)
|
|
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
|
|
|
|
return {
|
|
"source_url": normalized_url,
|
|
"profile_type": profile_type,
|
|
"profile_id": profile_id,
|
|
"full_name": header.get("full_name"),
|
|
"positions": header.get("positions") or [],
|
|
"hse_start_year": header.get("hse_start_year"),
|
|
"contacts": header.get("contacts") or {},
|
|
"external_ids": header.get("external_ids") or [],
|
|
"tabs": tabs,
|
|
"sections": sections,
|
|
"employee_internal_links": internal_links,
|
|
"parser_version": BACKEND_VERSION,
|
|
"_html": html,
|
|
}
|
|
|
|
|
|
def _render_with_playwright(source_url: str, fallback_html: str) -> str:
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except Exception:
|
|
return fallback_html
|
|
try:
|
|
with sync_playwright() as playwright:
|
|
browser = playwright.chromium.launch(headless=True)
|
|
page = browser.new_page()
|
|
page.goto(source_url, wait_until="domcontentloaded", timeout=45000)
|
|
for index in range(page.locator(".person-menu a").count()):
|
|
try:
|
|
page.locator(".person-menu a").nth(index).click(timeout=2500, force=True)
|
|
page.wait_for_timeout(450)
|
|
except Exception:
|
|
continue
|
|
html = page.content()
|
|
browser.close()
|
|
return html
|
|
except Exception:
|
|
return fallback_html
|
|
|
|
|
|
def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]:
|
|
nodes = []
|
|
for sibling in start_h2.next_siblings:
|
|
if isinstance(sibling, Tag) and sibling.name == "h2":
|
|
break
|
|
if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)):
|
|
continue
|
|
nodes.append(sibling)
|
|
return nodes
|
|
|
|
|
|
def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]:
|
|
links = []
|
|
for anchor in node.select("a[href]"):
|
|
text = normalize_ws(anchor.get_text(" ", strip=True))
|
|
href = anchor.get("href", "").strip()
|
|
if text and href and "timetable" not in href.lower() and "расписание" not in text.lower():
|
|
links.append({"text": text, "url": urljoin(source_url, href)})
|
|
return links
|
|
|
|
|
|
def _nodes_raw_text(nodes: list) -> str:
|
|
chunks = []
|
|
for node in nodes:
|
|
text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node))
|
|
if text:
|
|
chunks.append(text)
|
|
return "\n".join(chunks)
|
|
|
|
|
|
def _nodes_paragraphs(nodes: list) -> list[str]:
|
|
paragraphs = []
|
|
for node in nodes:
|
|
if isinstance(node, Tag):
|
|
paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p"))
|
|
return [p for p in paragraphs if p]
|
|
|
|
|
|
def _nodes_list_items(nodes: list) -> list[str]:
|
|
items = []
|
|
for node in nodes:
|
|
if isinstance(node, Tag):
|
|
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
|
|
return [item for item in items if item and "расписание" not in item.lower()]
|
|
|
|
|
|
def _infer_section_type(title: str, nodes: list) -> str:
|
|
lowered = title.lower()
|
|
if _has_table(nodes):
|
|
return "table"
|
|
if "публикац" in lowered:
|
|
return "publications"
|
|
if "учебные курсы" in lowered:
|
|
return "courses_by_year"
|
|
if _nodes_list_items(nodes):
|
|
return "list"
|
|
if _nodes_paragraphs(nodes):
|
|
return "paragraphs"
|
|
return "generic"
|
|
|
|
|
|
def _has_table(nodes: list) -> bool:
|
|
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
|
|
|
|
|
|
def _parse_table(nodes: list, source_url: str) -> dict:
|
|
for node in nodes:
|
|
if not isinstance(node, Tag):
|
|
continue
|
|
table = node if node.name == "table" else node.find("table")
|
|
if not table:
|
|
continue
|
|
headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")]
|
|
rows = []
|
|
for tr in table.select("tr"):
|
|
cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")]
|
|
if cells:
|
|
link = tr.find("a", href=True)
|
|
rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None})
|
|
return {"headers": headers, "rows": rows}
|
|
return {"headers": [], "rows": []}
|
|
|
|
|
|
def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]:
|
|
count_match = re.search(r"(\d+)\s*$", title)
|
|
publications = []
|
|
for node in nodes:
|
|
if not isinstance(node, Tag):
|
|
continue
|
|
for li in node.select("li"):
|
|
text = normalize_ws(li.get_text(" ", strip=True))
|
|
anchor = li.find("a", href=True)
|
|
if text:
|
|
publications.append(
|
|
{
|
|
"title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text,
|
|
"url": urljoin(source_url, anchor["href"]) if anchor else None,
|
|
"text": text,
|
|
}
|
|
)
|
|
if publications:
|
|
break
|
|
if not publications:
|
|
publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line]
|
|
return int(count_match.group(1)) if count_match else None, publications
|
|
|
|
|
|
def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]:
|
|
year_match = re.search(r"(\d{4}/\d{4})", title)
|
|
courses = []
|
|
for node in nodes:
|
|
if isinstance(node, Tag):
|
|
for li in node.select("li"):
|
|
anchor = li.find("a", href=True)
|
|
course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True))
|
|
if course_title:
|
|
courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None})
|
|
return year_match.group(1) if year_match else None, _dedupe_dicts(courses)
|
|
|
|
|
|
def _parse_year_entries(nodes: list, source_url: str) -> list[dict]:
|
|
entries = []
|
|
for node in nodes:
|
|
if not isinstance(node, Tag):
|
|
continue
|
|
for year_node in node.select(".person-list-hangover"):
|
|
year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True))
|
|
parent = year_node.parent
|
|
if parent:
|
|
entries.append(
|
|
{
|
|
"year": int(year_match.group(1)) if year_match else None,
|
|
"text": normalize_ws(parent.get_text(" ", strip=True)),
|
|
"links": _extract_links(parent, source_url),
|
|
}
|
|
)
|
|
return entries
|
|
|
|
|
|
def _parse_vkr_items(nodes: list) -> list[str]:
|
|
items = []
|
|
for node in nodes:
|
|
if isinstance(node, Tag):
|
|
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
|
|
return [item for item in dict.fromkeys(items) if item]
|
|
|
|
|
|
def _slugify(value: str) -> str:
|
|
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
|
|
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"
|
|
|
|
|
|
def _dedupe_tabs(items: list[dict]) -> list[dict]:
|
|
seen = set()
|
|
unique = []
|
|
for item in items:
|
|
key = (item.get("title"), item.get("href"))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(item)
|
|
return unique
|
|
|
|
|
|
def _dedupe_dicts(items: list[dict]) -> list[dict]:
|
|
seen = set()
|
|
unique = []
|
|
for item in items:
|
|
key = tuple(sorted(item.items()))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(item)
|
|
return unique
|