import gzip import hashlib import json import time from datetime import datetime, timezone import requests from sqlalchemy import select from sqlalchemy.orm import Session from app.config import Settings from app.models import CrawlError, CrawlRun, Employee, EmployeeSnapshot, ParserSource, ProfileTab from app.parser.collector import collect_profile_links from app.parser.profile import parse_person_profile from app.parser.profile_url import profile_key HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)" } def run_crawl(db: Session, settings: Settings) -> CrawlRun: source = _ensure_source(db, settings.source_url) run = CrawlRun(source_url=source.source_url, status="running") db.add(run) db.commit() db.refresh(run) found_keys: set[str] = set() parsed_count = 0 try: with requests.Session() as session: urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout) if settings.crawl_limit: urls = urls[: settings.crawl_limit] run.found_count = len(urls) db.commit() for url in urls: key = profile_key(url) if key: found_keys.add(key) try: parsed = parse_person_profile( session, url, HEADERS, settings.request_timeout, settings.parser_use_playwright, ) if not parsed: continue _upsert_employee(db, run, parsed) parsed_count += 1 run.parsed_count = parsed_count db.commit() except Exception as exc: run.error_count += 1 db.add( CrawlError( crawl_run_id=run.id, profile_url=url, error_type=type(exc).__name__, message=str(exc), ) ) db.commit() finally: time.sleep(settings.request_delay_seconds) run.dismissed_count = _mark_dismissed(db, found_keys) run.status = "completed" except Exception as exc: run.status = "failed" run.message = str(exc) finally: run.finished_at = datetime.now(timezone.utc) db.commit() db.refresh(run) return run def _ensure_source(db: Session, source_url: str) -> ParserSource: source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url)) if source: return source source = ParserSource(source_url=source_url, enabled=True) db.add(source) db.commit() db.refresh(source) return source def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee: html = parsed.pop("_html", None) checksum = _checksum(parsed) key = f"{parsed.get('profile_type')}:{parsed.get('profile_id')}" employee = db.scalar(select(Employee).where(Employee.profile_key == key)) now = datetime.now(timezone.utc) if not employee: employee = Employee( profile_key=key, profile_type=parsed.get("profile_type"), profile_id=parsed.get("profile_id"), canonical_url=parsed["source_url"], first_seen_at=now, ) db.add(employee) run.new_count += 1 employee.full_name = parsed.get("full_name") employee.status = "active" employee.last_seen_at = now employee.dismissed_at = None employee.parser_version = parsed.get("parser_version") employee.current_data = parsed employee.current_checksum = checksum db.flush() db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete() for tab in parsed.get("tabs") or []: db.add( ProfileTab( employee_id=employee.id, title=tab.get("title") or "", href=tab.get("href") or "", data_index=tab.get("data_index"), ) ) db.add( EmployeeSnapshot( employee_id=employee.id, crawl_run_id=run.id, parsed_data=parsed, html_snapshot=gzip.compress(html.encode("utf-8")) if html else None, checksum=checksum, parser_version=parsed.get("parser_version"), ) ) return employee def _mark_dismissed(db: Session, found_keys: set[str]) -> int: dismissed = 0 active = db.scalars(select(Employee).where(Employee.status == "active")).all() now = datetime.now(timezone.utc) for employee in active: if employee.profile_key in found_keys: continue employee.status = "dismissed" employee.dismissed_at = now dismissed += 1 db.commit() return dismissed def _checksum(data: dict) -> str: payload = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":")) return hashlib.sha256(payload.encode("utf-8")).hexdigest()