import gzip import hashlib import json import re import time from datetime import datetime, timezone import requests from sqlalchemy import select from sqlalchemy.orm import Session from app.config import Settings from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab from app.parser.collector import collect_profile_links from app.parser.profile import parse_person_profile from app.parser.profile_url import profile_key from app.services.dataset_versions import get_or_create_current_version from app.services.resource_cache import ResourceCache HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)" } def run_crawl(db: Session, settings: Settings) -> CrawlRun: source = _ensure_source(db, settings.source_url) run = CrawlRun(source_url=source.source_url, status="running") db.add(run) db.commit() db.refresh(run) found_keys: set[str] = set() parsed_count = 0 skipped_count = 0 try: with requests.Session() as session: resource_cache = ResourceCache(db) urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout) if settings.crawl_limit: urls = urls[: settings.crawl_limit] run.found_count = len(urls) db.commit() for url in urls: key = profile_key(url) if key: found_keys.add(key) try: parsed = parse_person_profile( session, url, HEADERS, settings.request_timeout, settings.parser_use_playwright, resource_cache=resource_cache, ) if not parsed: continue _, changed = _upsert_employee(db, run, parsed) if changed: parsed_count += 1 else: skipped_count += 1 run.parsed_count = parsed_count run.skipped_count = skipped_count db.commit() except Exception as exc: run.error_count += 1 db.add( CrawlError( crawl_run_id=run.id, profile_url=url, error_type=type(exc).__name__, message=str(exc), ) ) db.commit() finally: time.sleep(settings.request_delay_seconds) run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout) run.status = "completed" get_or_create_current_version(db, crawl_run_id=run.id) except Exception as exc: run.status = "failed" run.message = str(exc) finally: run.finished_at = datetime.now(timezone.utc) db.commit() db.refresh(run) return run def refresh_employee(db: Session, employee: Employee, settings: Settings) -> CrawlRun: run = CrawlRun(source_url=employee.canonical_url, status="running", found_count=1) db.add(run) db.commit() db.refresh(run) try: with requests.Session() as session: resource_cache = ResourceCache(db) parsed = parse_person_profile( session, employee.canonical_url, HEADERS, settings.request_timeout, settings.parser_use_playwright, resource_cache=resource_cache, ) if not parsed: raise ValueError("Профиль не удалось распарсить.") if _parsed_profile_key(parsed) != employee.profile_key: raise ValueError("Распарсенный профиль не совпадает с обновляемым сотрудником.") _, changed = _upsert_employee(db, run, parsed) if changed: run.parsed_count = 1 else: run.skipped_count = 1 run.status = "completed" get_or_create_current_version(db, crawl_run_id=run.id) except Exception as exc: run.status = "failed" run.error_count = 1 run.message = str(exc) db.add( CrawlError( crawl_run_id=run.id, profile_url=employee.canonical_url, error_type=type(exc).__name__, message=str(exc), ) ) finally: run.finished_at = datetime.now(timezone.utc) db.commit() db.refresh(run) return run def _ensure_source(db: Session, source_url: str) -> ParserSource: source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url)) if source: return source source = ParserSource(source_url=source_url, enabled=True) db.add(source) db.commit() db.refresh(source) return source def _parsed_profile_key(parsed: dict) -> str: return f"{parsed.get('profile_type')}:{parsed.get('profile_id')}" def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee, bool]: html = parsed.pop("_html", None) parsed.pop("_resource_manifest", None) checksum = _checksum(parsed) key = _parsed_profile_key(parsed) employee = db.scalar(select(Employee).where(Employee.profile_key == key)) now = datetime.now(timezone.utc) if not employee: employee = Employee( profile_key=key, profile_type=parsed.get("profile_type"), profile_id=parsed.get("profile_id"), canonical_url=parsed["source_url"], first_seen_at=now, ) db.add(employee) run.new_count += 1 is_new = True else: is_new = False parser_version = parsed.get("parser_version") changed = is_new or employee.current_checksum != checksum or employee.parser_version != parser_version employee.full_name = parsed.get("full_name") employee.status = "active" employee.last_seen_at = now employee.dismissed_at = None employee.parser_version = parser_version if changed: employee.current_data = parsed employee.current_checksum = checksum db.flush() if is_new: _record_employee_change( db, run, employee, "new", profile_available=True, message="Сотрудник впервые найден в источнике.", ) if changed: db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete() for tab in parsed.get("tabs") or []: db.add( ProfileTab( employee_id=employee.id, title=tab.get("title") or "", href=tab.get("href") or "", data_index=tab.get("data_index"), ) ) db.add( EmployeeSnapshot( employee_id=employee.id, crawl_run_id=run.id, parsed_data=parsed, html_snapshot=gzip.compress(html.encode("utf-8")) if html else None, checksum=checksum, parser_version=parser_version, ) ) return employee, changed def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int: dismissed = 0 active = db.scalars(select(Employee).where(Employee.status == "active")).all() now = datetime.now(timezone.utc) for employee in active: if employee.profile_key in found_keys: continue profile_available = _profile_is_available(session, employee.canonical_url, timeout) if profile_available: _record_employee_change( db, run, employee, "missing_from_source", profile_available=True, message="Профиль доступен, но ссылка отсутствует в исходном списке.", ) continue employee.status = "dismissed" employee.dismissed_at = now _record_employee_change( db, run, employee, "dismissed", profile_available=False, message="Сотрудник отсутствует в исходном списке, профиль не подтвердился как доступный.", ) dismissed += 1 db.commit() return dismissed def _profile_is_available(session: requests.Session, url: str, timeout: int) -> bool: try: response = session.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True) return response.status_code < 400 except requests.RequestException: return False def _record_employee_change( db: Session, run: CrawlRun, employee: Employee, change_type: str, *, profile_available: bool | None, message: str, ) -> None: db.add( CrawlRunEmployeeChange( crawl_run_id=run.id, employee_id=employee.id, profile_key=employee.profile_key, profile_url=employee.canonical_url, full_name=employee.full_name, change_type=change_type, profile_available=profile_available, message=message, ) ) def _checksum(data: dict) -> str: payload = json.dumps(_stable_checksum_payload(data), ensure_ascii=False, sort_keys=True, separators=(",", ":")) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def _stable_checksum_payload(value): if isinstance(value, dict): return {key: _stable_checksum_payload(item) for key, item in value.items()} if isinstance(value, list): return [_stable_checksum_payload(item) for item in value] if isinstance(value, str): return _normalize_date_dependent_experience(value) return value def _normalize_date_dependent_experience(value: str) -> str: return re.sub( r"(?i)(стаж(?:\s+работы)?(?:\s+в\s+ниу\s+вшэ|\s+в\s+вшэ)?\s*:?\s*)\d+\s*(?:год(?:а|ов)?|лет)", r"\1", value, )