Files
miem_workers/app/parser/profile.py

852 lines
30 KiB
Python

import hashlib
import json
import re
from urllib.parse import urljoin
from bs4 import BeautifulSoup, NavigableString, Tag
from requests import Session
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
from app.version import BACKEND_VERSION
_YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE)
_EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})")
_PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE)
def normalize_ws(value: str | None) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]:
selectors = (
"div.person-menu.is-desktop.small.person-menu-addition",
".person-menu",
)
for selector in selectors:
menu = soup.select_one(selector)
if not menu:
continue
tabs = []
for anchor in menu.select("a[href]"):
title = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if title and href:
tabs.append(
{
"data_index": anchor.get("data-index"),
"title": title,
"href": urljoin(source_url, href),
}
)
if tabs:
return _dedupe_tabs(tabs)
return []
def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict:
name_node = soup.select_one("h1.person-caption") or soup.find("h1")
text = normalize_ws(soup.get_text(" ", strip=True))
year_match = _YEAR_PATTERN.search(text)
contacts = {"phones": [], "emails": [], "address": None, "items": []}
for email in _EMAIL_PATTERN.findall(text):
if email not in contacts["emails"]:
contacts["emails"].append(email)
for phone in _PHONE_PATTERN.findall(text):
normalized_phone = normalize_ws(phone)
if normalized_phone and normalized_phone not in contacts["phones"]:
contacts["phones"].append(normalized_phone)
address_match = re.search(
r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)",
text,
flags=re.IGNORECASE,
)
if address_match:
contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",")
positions = []
for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"):
value = normalize_ws(li.get_text(" ", strip=True))
if value:
positions.append(value)
external_ids = []
id_domains = (
("ORCID", "orcid.org"),
("Scopus AuthorID", "scopus.com"),
("ResearcherID", "webofscience.com"),
("Google Scholar", "scholar.google."),
("SPIN РИНЦ", "elibrary.ru"),
)
for anchor in soup.select("a[href]"):
href = anchor.get("href", "").strip()
label = normalize_ws(anchor.get_text(" ", strip=True))
for system, marker in id_domains:
if marker in href:
external_ids.append({"system": system, "value": label or system, "url": href})
break
return {
"source_url": source_url,
"full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None,
"positions": positions,
"hse_start_year": int(year_match.group(1)) if year_match else None,
"contacts": contacts,
"external_ids": _dedupe_dicts(external_ids),
}
def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]:
sections = []
for h2 in soup.select("h2"):
title = normalize_ws(h2.get_text(" ", strip=True))
if not title or "расписание занятий" in title.lower():
continue
nodes = _collect_between_h2(h2)
raw_text = _nodes_raw_text(nodes)
paragraphs = _nodes_paragraphs(nodes)
items = _nodes_list_items(nodes)
links = []
for node in nodes:
if isinstance(node, Tag):
links.extend(_extract_links(node, source_url))
section_type = _infer_section_type(title, nodes)
section = {
"title": title,
"slug": _slugify(title),
"type": section_type,
"raw_text": raw_text,
"paragraphs": paragraphs,
"items": items,
"links": links,
}
if section_type == "publications":
section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url)
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
elif section_type == "courses_by_year":
section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url)
section.pop("items", None)
section.pop("links", None)
elif section_type == "table":
section["table"] = _parse_table(nodes, source_url)
elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower():
section["items"] = _parse_vkr_items(nodes)
year_entries = _parse_year_entries(nodes, source_url)
if year_entries:
section["year_entries"] = year_entries
if section_type in {"generic", "paragraphs"}:
section["type"] = "year_blocks"
sections.append(section)
return sections
def parse_person_profile(
session: Session,
source_url: str,
headers: dict[str, str],
timeout: int,
use_playwright: bool = False,
resource_cache=None,
) -> dict | None:
normalized_url = normalize_profile_url(source_url)
if not normalized_url:
return None
profile_type, profile_id = parse_profile_identity(normalized_url)
cache_profile_key = f"{profile_type}:{profile_id}"
resource_manifest = []
html = _fetch_text(
session,
normalized_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_key="main-html",
resource_manifest=resource_manifest,
)
if use_playwright:
html = _render_with_playwright(normalized_url, html)
soup = BeautifulSoup(html, "html.parser")
header = extract_person_header(soup, normalized_url)
tabs = extract_person_tabs(soup, normalized_url)
sections = extract_sections(soup, normalized_url)
sections = enrich_sections_from_hse_widgets(
session,
soup,
normalized_url,
headers,
timeout,
sections,
resource_cache=resource_cache,
profile_key=cache_profile_key,
resource_manifest=resource_manifest,
)
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
return {
"source_url": normalized_url,
"profile_type": profile_type,
"profile_id": profile_id,
"full_name": header.get("full_name"),
"positions": header.get("positions") or [],
"hse_start_year": header.get("hse_start_year"),
"contacts": header.get("contacts") or {},
"external_ids": header.get("external_ids") or [],
"tabs": tabs,
"sections": sections,
"employee_internal_links": internal_links,
"parser_version": BACKEND_VERSION,
"_html": html,
"_resource_manifest": resource_manifest,
}
def enrich_sections_from_hse_widgets(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
sections: list[dict],
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
enriched = list(sections)
publications = _load_widget_publications(
session,
soup,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if publications:
enriched = _upsert_publications_section(enriched, publications)
theses = _load_widget_graduation_theses(
session,
soup,
source_url,
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_manifest=resource_manifest,
)
if theses:
enriched = _upsert_graduation_theses_section(enriched, theses)
return enriched
def _render_with_playwright(source_url: str, fallback_html: str) -> str:
try:
from playwright.sync_api import sync_playwright
except Exception:
return fallback_html
try:
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
page.goto(source_url, wait_until="domcontentloaded", timeout=45000)
for index in range(page.locator(".person-menu a").count()):
try:
page.locator(".person-menu a").nth(index).click(timeout=2500, force=True)
page.wait_for_timeout(450)
except Exception:
continue
html = page.content()
browser.close()
return html
except Exception:
return fallback_html
def _load_widget_publications(
session: Session,
soup: BeautifulSoup,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[data-widget-name="AuthorSearch"][data-author]')
if not script:
return []
author_id = normalize_ws(script.get("data-author"))
if not author_id:
return []
publications = []
page_id = 1
per_page = 100
while page_id <= 20:
payload = {
"type": "ANY",
"filterParams": (
f'"acceptLanguage":"ru"|"fullTextPublicEnabled": 1|'
f'"pubsAuthor": {author_id}|"widgetName": "AuthorSearch"'
),
"paginationParams": {
"publsSort": ["TITLE_ASC"],
"publsCount": per_page,
"pageId": page_id,
},
}
try:
if resource_cache and profile_key:
text = _fetch_text(
session,
"https://publications.hse.ru/api/searchPubs",
headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key=f"publications-page-{page_id}",
resource_manifest=resource_manifest,
method="POST",
json_payload=payload,
)
data = json.loads(text)
else:
response = session.post(
"https://publications.hse.ru/api/searchPubs",
json=payload,
headers=headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return publications
result = data.get("result") if isinstance(data, dict) else {}
items = _extract_publication_items(result)
if not items:
break
publications.extend(_normalize_publication_item(item, author_id) for item in items)
total = int(result.get("total") or 0)
if not result.get("more") and len(publications) >= total:
break
page_id += 1
return _dedupe_publications(publications)
def _extract_publication_items(result: object) -> list[dict]:
if not isinstance(result, dict):
return []
return _flatten_publication_items(result.get("items"))
def _flatten_publication_items(value: object) -> list[dict]:
if isinstance(value, list):
return [item for item in value if _is_publication_item(item)]
if not isinstance(value, dict):
return []
nested_items = value.get("items")
if isinstance(nested_items, list):
return [item for item in nested_items if _is_publication_item(item)]
if isinstance(nested_items, dict):
return _flatten_publication_items(nested_items)
publications = []
for child in value.values():
publications.extend(_flatten_publication_items(child))
return publications
def _is_publication_item(value: object) -> bool:
return isinstance(value, dict) and ("id" in value or "title" in value)
def _load_widget_graduation_theses(
session: Session,
soup: BeautifulSoup,
source_url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_manifest: list[dict] | None = None,
) -> list[dict]:
script = soup.select_one('script[src*="/n/stat/vkr/app.js"][data-person-id]')
if not script:
return []
person_id = normalize_ws(script.get("data-person-id"))
api_url = normalize_ws(script.get("data-api-url")) or "/n/vkr/api/"
if not person_id:
return []
request_headers = {**headers, "x-portal-language": "ru"}
try:
url = urljoin(source_url, api_url)
params = {"supervisorId": person_id}
if resource_cache and profile_key:
text = _fetch_text(
session,
url,
request_headers,
timeout,
resource_cache=resource_cache,
profile_key=profile_key,
resource_key="graduation-theses",
resource_manifest=resource_manifest,
params=params,
)
data = json.loads(text)
else:
response = session.get(
url,
params=params,
headers=request_headers,
timeout=timeout,
)
response.raise_for_status()
data = response.json()
except Exception:
return []
items = data.get("data") if isinstance(data, dict) else []
if not isinstance(items, list):
return []
return [_normalize_vkr_item(item, source_url) for item in items if isinstance(item, dict)]
def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]:
nodes = []
for sibling in start_h2.next_siblings:
if isinstance(sibling, Tag) and sibling.name == "h2":
break
if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)):
continue
nodes.append(sibling)
return nodes
def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]:
links = []
for anchor in node.select("a[href]"):
text = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if text and href and "timetable" not in href.lower() and "расписание" not in text.lower():
links.append({"text": text, "url": urljoin(source_url, href)})
return links
def _nodes_raw_text(nodes: list) -> str:
chunks = []
for node in nodes:
text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node))
if text:
chunks.append(text)
return "\n".join(chunks)
def _nodes_paragraphs(nodes: list) -> list[str]:
paragraphs = []
for node in nodes:
if isinstance(node, Tag):
paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p"))
return [p for p in paragraphs if p]
def _nodes_list_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in items if item and "расписание" not in item.lower()]
def _infer_section_type(title: str, nodes: list) -> str:
lowered = title.lower()
if _has_table(nodes):
return "table"
if _is_publications_title(lowered):
return "publications"
if "учебные курсы" in lowered:
return "courses_by_year"
if _nodes_list_items(nodes):
return "list"
if _nodes_paragraphs(nodes):
return "paragraphs"
return "generic"
def _is_publications_title(lowered_title: str) -> bool:
return lowered_title.startswith("публикац")
def _has_table(nodes: list) -> bool:
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
def _parse_table(nodes: list, source_url: str) -> dict:
for node in nodes:
if not isinstance(node, Tag):
continue
table = node if node.name == "table" else node.find("table")
if not table:
continue
headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")]
rows = []
for tr in table.select("tr"):
cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")]
if cells:
link = tr.find("a", href=True)
rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None})
return {"headers": headers, "rows": rows}
return {"headers": [], "rows": []}
def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]:
count_match = re.search(r"(\d+)\s*$", title)
publications = []
for node in nodes:
if not isinstance(node, Tag):
continue
for li in node.select("li"):
text = normalize_ws(li.get_text(" ", strip=True))
anchor = li.find("a", href=True)
if text:
publications.append(
{
"title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text,
"url": urljoin(source_url, anchor["href"]) if anchor else None,
"text": text,
}
)
if publications:
break
if not publications:
publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line]
return int(count_match.group(1)) if count_match else None, publications
def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]:
year_match = re.search(r"(\d{4}/\d{4})", title)
courses = []
for node in nodes:
if isinstance(node, Tag):
for li in node.select("li"):
anchor = li.find("a", href=True)
course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True))
if course_title:
courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None})
return year_match.group(1) if year_match else None, _dedupe_dicts(courses)
def _parse_year_entries(nodes: list, source_url: str) -> list[dict]:
entries = []
for node in nodes:
if not isinstance(node, Tag):
continue
for year_node in node.select(".person-list-hangover"):
year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True))
parent = year_node.parent
if parent:
entries.append(
{
"year": int(year_match.group(1)) if year_match else None,
"text": normalize_ws(parent.get_text(" ", strip=True)),
"links": _extract_links(parent, source_url),
}
)
return entries
def _parse_vkr_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in dict.fromkeys(items) if item]
def _normalize_publication_item(item: dict, current_author_id: str | None = None) -> dict:
publication_id = str(item.get("id") or "").strip()
title = _html_to_text(item.get("title"))
year = _int_or_none(item.get("year"))
publication_type = str(item.get("type") or "").strip() or None
description = item.get("description") if isinstance(item.get("description"), dict) else {}
short_description = _localized_value(description.get("short")) or _localized_value(description.get("shortLeft"))
documents = item.get("documents") if isinstance(item.get("documents"), dict) else {}
language = item.get("language") if isinstance(item.get("language"), dict) else {}
annotation = _localized_text_map(item.get("annotation"))
authors = _normalize_publication_authors(item.get("authorsByType"), current_author_id)
citation_text = normalize_ws(str(description.get("main") or "")) or _build_publication_citation(title, authors, year)
text = normalize_ws(" ".join(part for part in [title, str(year or ""), short_description] if part))
return {
"id": publication_id or None,
"publication_id": publication_id or None,
"title": title or publication_id,
"year": year,
"type": publication_type,
"publication_type": publication_type,
"language": normalize_ws(language.get("name")) or None,
"status": _int_or_none(item.get("status")),
"url": f"https://publications.hse.ru/view/{publication_id}" if publication_id else None,
"doi_url": _document_href(documents, "DOI"),
"other_url": _document_href(documents, "OTHER_URL"),
"document_url": _document_href(documents, "DOCUMENT"),
"citation_text": citation_text or None,
"annotation": annotation,
"description": description or None,
"authors": authors,
"raw_data": item,
"text": text or title or publication_id,
}
def _normalize_vkr_item(item: dict, source_url: str) -> dict:
thesis_id = item.get("id")
program = item.get("learnProgram") if isinstance(item.get("learnProgram"), dict) else {}
org_unit = item.get("orgUnit") if isinstance(item.get("orgUnit"), dict) else {}
supervisors = []
for supervisor in item.get("supervisors") or []:
if not isinstance(supervisor, dict):
continue
name = normalize_ws(supervisor.get("name"))
url = normalize_ws(supervisor.get("url"))
if name or url:
supervisors.append({"name": name or url, "url": url or None})
return {
"id": thesis_id,
"student": normalize_ws(item.get("student")),
"title": normalize_ws(item.get("title")),
"defense_year": item.get("year"),
"level": normalize_ws(item.get("level")),
"rating": item.get("rating"),
"project_url": urljoin(source_url, f"/edu/vkr/{thesis_id}") if thesis_id else None,
"program": normalize_ws(program.get("title")),
"program_url": urljoin(source_url, program.get("url")) if program.get("url") else None,
"org_unit": normalize_ws(org_unit.get("title")),
"org_unit_url": urljoin(source_url, org_unit.get("url")) if org_unit.get("url") else None,
"supervisors": supervisors,
"text": normalize_ws(" ".join(str(part) for part in [item.get("student"), item.get("title"), item.get("year")] if part)),
}
def _upsert_publications_section(sections: list[dict], publications: list[dict]) -> list[dict]:
merged = []
inserted = False
for section in sections:
if section.get("type") != "publications":
merged.append(section)
continue
existing = section.get("publications") or []
section = {
**section,
"publications_count": max(section.get("publications_count") or 0, len(publications)),
"publications": _dedupe_publications([*existing, *publications]),
}
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
merged.append(section)
inserted = True
if not inserted:
merged.append(
{
"title": "Публикации и исследования",
"slug": "publikacii_i_issledovaniya",
"type": "publications",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in publications if item.get("text")],
"links": [],
"publications_count": len(publications),
"publications": publications,
}
)
return merged
def _upsert_graduation_theses_section(sections: list[dict], theses: list[dict]) -> list[dict]:
section = {
"title": "Выпускные квалификационные работы студентов НИУ ВШЭ",
"slug": "vypusknye_kvalifikacionnye_raboty_studentov_niu_vshe",
"type": "graduation_theses",
"raw_text": "",
"paragraphs": [],
"items": [item["text"] for item in theses if item.get("text")],
"links": [{"text": item["title"], "url": item["project_url"]} for item in theses if item.get("title") and item.get("project_url")],
"theses_count": len(theses),
"theses": theses,
}
return [item for item in sections if item.get("type") != "graduation_theses"] + [section]
def _dedupe_publications(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = item.get("id") or item.get("url") or item.get("title")
if key and key not in seen:
seen.add(key)
unique.append(item)
return unique
def _html_to_text(value: object) -> str:
return normalize_ws(BeautifulSoup(str(value or ""), "html.parser").get_text(" ", strip=True))
def _localized_text_map(value: object) -> dict[str, str]:
if not isinstance(value, dict):
return {}
localized = {}
for key in ("ru", "en", "publ"):
text = _html_to_text(value.get(key))
if text:
localized[key] = text
return localized
def _localized_value(value: object) -> str:
if isinstance(value, dict):
return normalize_ws(value.get("ru") or value.get("publ") or value.get("en"))
return normalize_ws(str(value or ""))
def _normalize_publication_authors(value: object, current_author_id: str | None) -> list[dict]:
if not isinstance(value, dict):
return []
authors = []
for author in value.get("author") or []:
if not isinstance(author, dict):
continue
title = author.get("title") if isinstance(author.get("title"), dict) else {}
reverse_title = author.get("reverseTitle") if isinstance(author.get("reverseTitle"), dict) else {}
author_id = normalize_ws(author.get("id"))
href = normalize_ws(author.get("href"))
authors.append(
{
"id": author_id or None,
"href": urljoin("https://www.hse.ru", href) if href else None,
"title_ru": _html_to_text(title.get("ru")),
"title_en": _html_to_text(title.get("en")),
"reverse_title_ru": _html_to_text(reverse_title.get("ru")),
"reverse_title_en": _html_to_text(reverse_title.get("en")),
"alt_name": normalize_ws(author.get("altName")) or None,
"other_name": normalize_ws(author.get("otherName")) or None,
"is_current_employee": bool(current_author_id and author_id == current_author_id),
}
)
return authors
def _document_href(documents: dict, key: str) -> str | None:
document = documents.get(key)
if not isinstance(document, dict):
return None
return normalize_ws(document.get("href")) or None
def _build_publication_citation(title: str, authors: list[dict], year: int | None) -> str:
author_names = [author.get("title_ru") or author.get("title_en") or author.get("alt_name") for author in authors]
return normalize_ws(". ".join(part for part in [", ".join(filter(None, author_names)), title, str(year or "")] if part))
def _int_or_none(value: object) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _slugify(value: str) -> str:
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"
def _dedupe_tabs(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = (item.get("title"), item.get("href"))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _dedupe_dicts(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = tuple(sorted(item.items()))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _fetch_text(
session: Session,
url: str,
headers: dict[str, str],
timeout: int,
*,
resource_cache=None,
profile_key: str | None = None,
resource_key: str,
resource_manifest: list[dict] | None,
method: str = "GET",
json_payload: object | None = None,
params: dict | None = None,
) -> str:
if resource_cache and profile_key:
cached = resource_cache.fetch_text(
session,
profile_key=profile_key,
resource_key=resource_key,
method=method,
url=url,
headers=headers,
timeout=timeout,
json_payload=json_payload,
params=params,
)
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": cached.body_hash,
"from_cache": cached.from_cache,
"status_code": cached.status_code,
}
)
return cached.text
if method.upper() == "POST":
response = session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params)
else:
response = session.get(url, headers=headers, timeout=timeout, params=params)
response.raise_for_status()
text = response.text
if resource_manifest is not None:
resource_manifest.append(
{
"resource_key": resource_key,
"method": method,
"url": url,
"body_hash": hashlib.sha256(text.encode("utf-8")).hexdigest(),
"from_cache": False,
"status_code": response.status_code,
}
)
return text