310 lines
10 KiB
Python
310 lines
10 KiB
Python
import gzip
|
||
import hashlib
|
||
import json
|
||
import re
|
||
import time
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
from sqlalchemy import select
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.config import Settings
|
||
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab
|
||
from app.parser.collector import collect_profile_links
|
||
from app.parser.profile import parse_person_profile
|
||
from app.parser.profile_url import profile_key
|
||
from app.services.dataset_versions import get_or_create_current_version
|
||
from app.services.resource_cache import ResourceCache
|
||
|
||
HEADERS = {
|
||
"User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)"
|
||
}
|
||
|
||
|
||
def run_crawl(db: Session, settings: Settings) -> CrawlRun:
|
||
source = _ensure_source(db, settings.source_url)
|
||
run = CrawlRun(source_url=source.source_url, status="running")
|
||
db.add(run)
|
||
db.commit()
|
||
db.refresh(run)
|
||
|
||
found_keys: set[str] = set()
|
||
parsed_count = 0
|
||
skipped_count = 0
|
||
try:
|
||
with requests.Session() as session:
|
||
resource_cache = ResourceCache(db)
|
||
urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout)
|
||
if settings.crawl_limit:
|
||
urls = urls[: settings.crawl_limit]
|
||
run.found_count = len(urls)
|
||
db.commit()
|
||
|
||
for url in urls:
|
||
key = profile_key(url)
|
||
if key:
|
||
found_keys.add(key)
|
||
try:
|
||
parsed = parse_person_profile(
|
||
session,
|
||
url,
|
||
HEADERS,
|
||
settings.request_timeout,
|
||
settings.parser_use_playwright,
|
||
resource_cache=resource_cache,
|
||
)
|
||
if not parsed:
|
||
continue
|
||
_, changed = _upsert_employee(db, run, parsed)
|
||
if changed:
|
||
parsed_count += 1
|
||
else:
|
||
skipped_count += 1
|
||
run.parsed_count = parsed_count
|
||
run.skipped_count = skipped_count
|
||
db.commit()
|
||
except Exception as exc:
|
||
run.error_count += 1
|
||
db.add(
|
||
CrawlError(
|
||
crawl_run_id=run.id,
|
||
profile_url=url,
|
||
error_type=type(exc).__name__,
|
||
message=str(exc),
|
||
)
|
||
)
|
||
db.commit()
|
||
finally:
|
||
time.sleep(settings.request_delay_seconds)
|
||
|
||
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
|
||
run.status = "completed"
|
||
get_or_create_current_version(db, crawl_run_id=run.id)
|
||
except Exception as exc:
|
||
run.status = "failed"
|
||
run.message = str(exc)
|
||
finally:
|
||
run.finished_at = datetime.now(timezone.utc)
|
||
db.commit()
|
||
db.refresh(run)
|
||
return run
|
||
|
||
|
||
def refresh_employee(db: Session, employee: Employee, settings: Settings) -> CrawlRun:
|
||
run = CrawlRun(source_url=employee.canonical_url, status="running", found_count=1)
|
||
db.add(run)
|
||
db.commit()
|
||
db.refresh(run)
|
||
|
||
try:
|
||
with requests.Session() as session:
|
||
resource_cache = ResourceCache(db)
|
||
parsed = parse_person_profile(
|
||
session,
|
||
employee.canonical_url,
|
||
HEADERS,
|
||
settings.request_timeout,
|
||
settings.parser_use_playwright,
|
||
resource_cache=resource_cache,
|
||
)
|
||
if not parsed:
|
||
raise ValueError("Профиль не удалось распарсить.")
|
||
if _parsed_profile_key(parsed) != employee.profile_key:
|
||
raise ValueError("Распарсенный профиль не совпадает с обновляемым сотрудником.")
|
||
|
||
_, changed = _upsert_employee(db, run, parsed)
|
||
if changed:
|
||
run.parsed_count = 1
|
||
else:
|
||
run.skipped_count = 1
|
||
run.status = "completed"
|
||
get_or_create_current_version(db, crawl_run_id=run.id)
|
||
except Exception as exc:
|
||
run.status = "failed"
|
||
run.error_count = 1
|
||
run.message = str(exc)
|
||
db.add(
|
||
CrawlError(
|
||
crawl_run_id=run.id,
|
||
profile_url=employee.canonical_url,
|
||
error_type=type(exc).__name__,
|
||
message=str(exc),
|
||
)
|
||
)
|
||
finally:
|
||
run.finished_at = datetime.now(timezone.utc)
|
||
db.commit()
|
||
db.refresh(run)
|
||
return run
|
||
|
||
|
||
def _ensure_source(db: Session, source_url: str) -> ParserSource:
|
||
source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url))
|
||
if source:
|
||
return source
|
||
source = ParserSource(source_url=source_url, enabled=True)
|
||
db.add(source)
|
||
db.commit()
|
||
db.refresh(source)
|
||
return source
|
||
|
||
|
||
def _parsed_profile_key(parsed: dict) -> str:
|
||
return f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
|
||
|
||
|
||
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee, bool]:
|
||
html = parsed.pop("_html", None)
|
||
parsed.pop("_resource_manifest", None)
|
||
checksum = _checksum(parsed)
|
||
key = _parsed_profile_key(parsed)
|
||
employee = db.scalar(select(Employee).where(Employee.profile_key == key))
|
||
now = datetime.now(timezone.utc)
|
||
if not employee:
|
||
employee = Employee(
|
||
profile_key=key,
|
||
profile_type=parsed.get("profile_type"),
|
||
profile_id=parsed.get("profile_id"),
|
||
canonical_url=parsed["source_url"],
|
||
first_seen_at=now,
|
||
)
|
||
db.add(employee)
|
||
run.new_count += 1
|
||
is_new = True
|
||
else:
|
||
is_new = False
|
||
|
||
parser_version = parsed.get("parser_version")
|
||
changed = is_new or employee.current_checksum != checksum or employee.parser_version != parser_version
|
||
employee.full_name = parsed.get("full_name")
|
||
employee.status = "active"
|
||
employee.last_seen_at = now
|
||
employee.dismissed_at = None
|
||
employee.parser_version = parser_version
|
||
if changed:
|
||
employee.current_data = parsed
|
||
employee.current_checksum = checksum
|
||
db.flush()
|
||
|
||
if is_new:
|
||
_record_employee_change(
|
||
db,
|
||
run,
|
||
employee,
|
||
"new",
|
||
profile_available=True,
|
||
message="Сотрудник впервые найден в источнике.",
|
||
)
|
||
|
||
if changed:
|
||
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
|
||
for tab in parsed.get("tabs") or []:
|
||
db.add(
|
||
ProfileTab(
|
||
employee_id=employee.id,
|
||
title=tab.get("title") or "",
|
||
href=tab.get("href") or "",
|
||
data_index=tab.get("data_index"),
|
||
)
|
||
)
|
||
|
||
db.add(
|
||
EmployeeSnapshot(
|
||
employee_id=employee.id,
|
||
crawl_run_id=run.id,
|
||
parsed_data=parsed,
|
||
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
|
||
checksum=checksum,
|
||
parser_version=parser_version,
|
||
)
|
||
)
|
||
return employee, changed
|
||
|
||
|
||
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
|
||
dismissed = 0
|
||
active = db.scalars(select(Employee).where(Employee.status == "active")).all()
|
||
now = datetime.now(timezone.utc)
|
||
for employee in active:
|
||
if employee.profile_key in found_keys:
|
||
continue
|
||
profile_available = _profile_is_available(session, employee.canonical_url, timeout)
|
||
if profile_available:
|
||
_record_employee_change(
|
||
db,
|
||
run,
|
||
employee,
|
||
"missing_from_source",
|
||
profile_available=True,
|
||
message="Профиль доступен, но ссылка отсутствует в исходном списке.",
|
||
)
|
||
continue
|
||
employee.status = "dismissed"
|
||
employee.dismissed_at = now
|
||
_record_employee_change(
|
||
db,
|
||
run,
|
||
employee,
|
||
"dismissed",
|
||
profile_available=False,
|
||
message="Сотрудник отсутствует в исходном списке, профиль не подтвердился как доступный.",
|
||
)
|
||
dismissed += 1
|
||
db.commit()
|
||
return dismissed
|
||
|
||
|
||
def _profile_is_available(session: requests.Session, url: str, timeout: int) -> bool:
|
||
try:
|
||
response = session.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True)
|
||
return response.status_code < 400
|
||
except requests.RequestException:
|
||
return False
|
||
|
||
|
||
def _record_employee_change(
|
||
db: Session,
|
||
run: CrawlRun,
|
||
employee: Employee,
|
||
change_type: str,
|
||
*,
|
||
profile_available: bool | None,
|
||
message: str,
|
||
) -> None:
|
||
db.add(
|
||
CrawlRunEmployeeChange(
|
||
crawl_run_id=run.id,
|
||
employee_id=employee.id,
|
||
profile_key=employee.profile_key,
|
||
profile_url=employee.canonical_url,
|
||
full_name=employee.full_name,
|
||
change_type=change_type,
|
||
profile_available=profile_available,
|
||
message=message,
|
||
)
|
||
)
|
||
|
||
|
||
def _checksum(data: dict) -> str:
|
||
payload = json.dumps(_stable_checksum_payload(data), ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def _stable_checksum_payload(value):
|
||
if isinstance(value, dict):
|
||
return {key: _stable_checksum_payload(item) for key, item in value.items()}
|
||
if isinstance(value, list):
|
||
return [_stable_checksum_payload(item) for item in value]
|
||
if isinstance(value, str):
|
||
return _normalize_date_dependent_experience(value)
|
||
return value
|
||
|
||
|
||
def _normalize_date_dependent_experience(value: str) -> str:
|
||
return re.sub(
|
||
r"(?i)(стаж(?:\s+работы)?(?:\s+в\s+ниу\s+вшэ|\s+в\s+вшэ)?\s*:?\s*)\d+\s*(?:год(?:а|ов)?|лет)",
|
||
r"\1<experience-years>",
|
||
value,
|
||
)
|