Files
miem_workers/app/services/crawler.py
2026-05-14 12:21:44 +03:00

310 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import gzip
import hashlib
import json
import re
import time
from datetime import datetime, timezone
import requests
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab
from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key
from app.services.dataset_versions import get_or_create_current_version
from app.services.resource_cache import ResourceCache
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)"
}
def run_crawl(db: Session, settings: Settings) -> CrawlRun:
source = _ensure_source(db, settings.source_url)
run = CrawlRun(source_url=source.source_url, status="running")
db.add(run)
db.commit()
db.refresh(run)
found_keys: set[str] = set()
parsed_count = 0
skipped_count = 0
try:
with requests.Session() as session:
resource_cache = ResourceCache(db)
urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout)
if settings.crawl_limit:
urls = urls[: settings.crawl_limit]
run.found_count = len(urls)
db.commit()
for url in urls:
key = profile_key(url)
if key:
found_keys.add(key)
try:
parsed = parse_person_profile(
session,
url,
HEADERS,
settings.request_timeout,
settings.parser_use_playwright,
resource_cache=resource_cache,
)
if not parsed:
continue
_, changed = _upsert_employee(db, run, parsed)
if changed:
parsed_count += 1
else:
skipped_count += 1
run.parsed_count = parsed_count
run.skipped_count = skipped_count
db.commit()
except Exception as exc:
run.error_count += 1
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=url,
error_type=type(exc).__name__,
message=str(exc),
)
)
db.commit()
finally:
time.sleep(settings.request_delay_seconds)
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
run.status = "completed"
get_or_create_current_version(db, crawl_run_id=run.id)
except Exception as exc:
run.status = "failed"
run.message = str(exc)
finally:
run.finished_at = datetime.now(timezone.utc)
db.commit()
db.refresh(run)
return run
def refresh_employee(db: Session, employee: Employee, settings: Settings) -> CrawlRun:
run = CrawlRun(source_url=employee.canonical_url, status="running", found_count=1)
db.add(run)
db.commit()
db.refresh(run)
try:
with requests.Session() as session:
resource_cache = ResourceCache(db)
parsed = parse_person_profile(
session,
employee.canonical_url,
HEADERS,
settings.request_timeout,
settings.parser_use_playwright,
resource_cache=resource_cache,
)
if not parsed:
raise ValueError("Профиль не удалось распарсить.")
if _parsed_profile_key(parsed) != employee.profile_key:
raise ValueError("Распарсенный профиль не совпадает с обновляемым сотрудником.")
_, changed = _upsert_employee(db, run, parsed)
if changed:
run.parsed_count = 1
else:
run.skipped_count = 1
run.status = "completed"
get_or_create_current_version(db, crawl_run_id=run.id)
except Exception as exc:
run.status = "failed"
run.error_count = 1
run.message = str(exc)
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=employee.canonical_url,
error_type=type(exc).__name__,
message=str(exc),
)
)
finally:
run.finished_at = datetime.now(timezone.utc)
db.commit()
db.refresh(run)
return run
def _ensure_source(db: Session, source_url: str) -> ParserSource:
source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url))
if source:
return source
source = ParserSource(source_url=source_url, enabled=True)
db.add(source)
db.commit()
db.refresh(source)
return source
def _parsed_profile_key(parsed: dict) -> str:
return f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee, bool]:
html = parsed.pop("_html", None)
parsed.pop("_resource_manifest", None)
checksum = _checksum(parsed)
key = _parsed_profile_key(parsed)
employee = db.scalar(select(Employee).where(Employee.profile_key == key))
now = datetime.now(timezone.utc)
if not employee:
employee = Employee(
profile_key=key,
profile_type=parsed.get("profile_type"),
profile_id=parsed.get("profile_id"),
canonical_url=parsed["source_url"],
first_seen_at=now,
)
db.add(employee)
run.new_count += 1
is_new = True
else:
is_new = False
parser_version = parsed.get("parser_version")
changed = is_new or employee.current_checksum != checksum or employee.parser_version != parser_version
employee.full_name = parsed.get("full_name")
employee.status = "active"
employee.last_seen_at = now
employee.dismissed_at = None
employee.parser_version = parser_version
if changed:
employee.current_data = parsed
employee.current_checksum = checksum
db.flush()
if is_new:
_record_employee_change(
db,
run,
employee,
"new",
profile_available=True,
message="Сотрудник впервые найден в источнике.",
)
if changed:
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
for tab in parsed.get("tabs") or []:
db.add(
ProfileTab(
employee_id=employee.id,
title=tab.get("title") or "",
href=tab.get("href") or "",
data_index=tab.get("data_index"),
)
)
db.add(
EmployeeSnapshot(
employee_id=employee.id,
crawl_run_id=run.id,
parsed_data=parsed,
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
checksum=checksum,
parser_version=parser_version,
)
)
return employee, changed
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
dismissed = 0
active = db.scalars(select(Employee).where(Employee.status == "active")).all()
now = datetime.now(timezone.utc)
for employee in active:
if employee.profile_key in found_keys:
continue
profile_available = _profile_is_available(session, employee.canonical_url, timeout)
if profile_available:
_record_employee_change(
db,
run,
employee,
"missing_from_source",
profile_available=True,
message="Профиль доступен, но ссылка отсутствует в исходном списке.",
)
continue
employee.status = "dismissed"
employee.dismissed_at = now
_record_employee_change(
db,
run,
employee,
"dismissed",
profile_available=False,
message="Сотрудник отсутствует в исходном списке, профиль не подтвердился как доступный.",
)
dismissed += 1
db.commit()
return dismissed
def _profile_is_available(session: requests.Session, url: str, timeout: int) -> bool:
try:
response = session.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True)
return response.status_code < 400
except requests.RequestException:
return False
def _record_employee_change(
db: Session,
run: CrawlRun,
employee: Employee,
change_type: str,
*,
profile_available: bool | None,
message: str,
) -> None:
db.add(
CrawlRunEmployeeChange(
crawl_run_id=run.id,
employee_id=employee.id,
profile_key=employee.profile_key,
profile_url=employee.canonical_url,
full_name=employee.full_name,
change_type=change_type,
profile_available=profile_available,
message=message,
)
)
def _checksum(data: dict) -> str:
payload = json.dumps(_stable_checksum_payload(data), ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _stable_checksum_payload(value):
if isinstance(value, dict):
return {key: _stable_checksum_payload(item) for key, item in value.items()}
if isinstance(value, list):
return [_stable_checksum_payload(item) for item in value]
if isinstance(value, str):
return _normalize_date_dependent_experience(value)
return value
def _normalize_date_dependent_experience(value: str) -> str:
return re.sub(
r"(?i)(стаж(?:\s+работы)?(?:\s+в\s+ниу\s+вшэ|\s+в\s+вшэ)?\s*:?\s*)\d+\s*(?:год(?:а|ов)?|лет)",
r"\1<experience-years>",
value,
)