import gzip from datetime import datetime, timezone from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeePublication, EmployeeSnapshot, ParseResourceCache from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee from app.services.resource_cache import ResourceCache class FakeResponse: def __init__(self, status_code): self.status_code = status_code class FakeSession: def __init__(self, statuses): self.statuses = statuses def get(self, url, **_kwargs): return FakeResponse(self.statuses[url]) class ConditionalResponse: def __init__(self, status_code, text="", headers=None): self.status_code = status_code self._text = text self.headers = headers or {} self.text_read = False @property def text(self): self.text_read = True return self._text def raise_for_status(self): return None class ConditionalSession: def __init__(self): self.requests = [] self.not_modified_response = ConditionalResponse(304) def get(self, url, **kwargs): self.requests.append((url, kwargs)) if kwargs["headers"].get("If-None-Match") == '"cached"': return self.not_modified_response return ConditionalResponse(200, "fresh", {"ETag": '"fresh"'}) def test_mark_dismissed_records_missing_source_when_profile_is_available(db_session): run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") db_session.add(run) db_session.add( Employee( profile_key="staff:kept", canonical_url="https://www.hse.ru/staff/kept", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), ) ) db_session.add( Employee( profile_key="staff:missing", canonical_url="https://www.hse.ru/staff/missing", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), ) ) db_session.commit() dismissed = _mark_dismissed( db_session, run, {"staff:kept"}, FakeSession({"https://www.hse.ru/staff/missing": 200}), 30, ) assert dismissed == 0 assert db_session.query(Employee).filter_by(profile_key="staff:kept").one().status == "active" missing = db_session.query(Employee).filter_by(profile_key="staff:missing").one() assert missing.status == "active" assert missing.dismissed_at is None change = db_session.query(CrawlRunEmployeeChange).one() assert change.change_type == "missing_from_source" assert change.profile_available is True def test_mark_dismissed_marks_missing_employee_when_profile_is_unavailable(db_session): run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") employee = Employee( profile_key="staff:gone", canonical_url="https://www.hse.ru/staff/gone", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), ) db_session.add_all([run, employee]) db_session.commit() dismissed = _mark_dismissed( db_session, run, set(), FakeSession({"https://www.hse.ru/staff/gone": 404}), 30, ) assert dismissed == 1 assert employee.status == "dismissed" assert employee.dismissed_at is not None change = db_session.query(CrawlRunEmployeeChange).one() assert change.change_type == "dismissed" assert change.profile_available is False def test_upsert_employee_increments_new_count_and_records_change_for_new_employee(db_session): run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") db_session.add(run) db_session.commit() _upsert_employee( db_session, run, { "source_url": "https://www.hse.ru/staff/newperson", "profile_type": "staff", "profile_id": "newperson", "full_name": "New Person", "tabs": [], "sections": [], "parser_version": "0.2.0", "_html": "", }, ) db_session.commit() assert run.new_count == 1 change = db_session.query(CrawlRunEmployeeChange).one() assert change.change_type == "new" assert change.full_name == "New Person" def test_resource_cache_uses_etag_and_reuses_cached_body_on_304(db_session): db_session.add( ParseResourceCache( profile_key="staff:cached", resource_key="main-html", method="GET", url="https://www.hse.ru/staff/cached", request_fingerprint="020d59db7b358d9023d0f185bcbf5a9c085d3cf2bf91d92d48eee9147e8d0f01", etag='"cached"', body_hash="cached-hash", body_snapshot=gzip.compress("cached body".encode("utf-8")), parser_version="0.6.0", ) ) db_session.commit() session = ConditionalSession() result = ResourceCache(db_session).fetch_text( session, profile_key="staff:cached", resource_key="main-html", method="GET", url="https://www.hse.ru/staff/cached", headers={"User-Agent": "test"}, timeout=10, ) assert session.requests[0][1]["headers"]["If-None-Match"] == '"cached"' assert result.text == "cached body" assert result.from_cache is True assert session.not_modified_response.text_read is False def test_upsert_employee_skips_snapshot_when_checksum_is_unchanged(db_session): first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") db_session.add_all([first_run, second_run]) db_session.commit() _, first_changed = _upsert_employee(db_session, first_run, _parsed_employee("same")) _, second_changed = _upsert_employee(db_session, second_run, _parsed_employee("same")) db_session.commit() assert first_changed is True assert second_changed is False assert db_session.query(EmployeeSnapshot).count() == 1 def test_upsert_employee_saves_publications_and_reuses_existing_rows(db_session): first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") db_session.add_all([first_run, second_run]) db_session.commit() parsed = _parsed_employee("published") parsed["sections"] = [ { "type": "publications", "publications": [ { "id": "888959076", "publication_id": "888959076", "title": "Detailed Publication", "year": 2023, "publication_type": "ARTICLE", "language": "ru", "status": 1, "url": "https://publications.hse.ru/view/888959076", "doi_url": "https://doi.org/10.1/test", "citation_text": "Detailed citation", "annotation": {"ru": "Аннотация"}, "description": {"main": "Detailed citation"}, "authors": [{"id": "1", "title_ru": "Автор"}], "raw_data": {"id": "888959076", "title": "Detailed Publication"}, } ], } ] employee, _ = _upsert_employee(db_session, first_run, parsed) db_session.commit() _upsert_employee(db_session, second_run, _parsed_employee_with_publication("published")) db_session.commit() publications = db_session.query(EmployeePublication).filter_by(employee_id=employee.id).all() assert len(publications) == 1 assert publications[0].doi_url == "https://doi.org/10.1/test" assert publications[0].authors == [{"id": "1", "title_ru": "Автор"}] def test_upsert_employee_records_publication_errors_without_failing_employee(monkeypatch, db_session): run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running") db_session.add(run) db_session.commit() def broken_sync(*_args, **_kwargs): raise RuntimeError("boom") monkeypatch.setattr("app.services.crawler._sync_employee_publications", broken_sync) employee, changed = _upsert_employee(db_session, run, _parsed_employee_with_publication("error-safe")) db_session.commit() assert changed is True assert employee.full_name == "Same Person" assert db_session.query(Employee).filter_by(profile_key="staff:error-safe").one() error = db_session.query(CrawlError).one() assert "публикации" in error.message.lower() def test_checksum_changes_when_widget_data_changes(): base = _parsed_employee("widgets") changed = _parsed_employee("widgets") changed["sections"] = [ { "type": "publications", "publications": [{"id": "1", "title": "New publication"}], } ] assert _checksum(base) != _checksum(changed) def test_checksum_ignores_date_dependent_experience_text(): first = _parsed_employee("experience") second = _parsed_employee("experience") first["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 5 лет"}] second["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 6 лет"}] assert _checksum(first) == _checksum(second) def _parsed_employee(profile_id: str) -> dict: return { "source_url": f"https://www.hse.ru/staff/{profile_id}", "profile_type": "staff", "profile_id": profile_id, "full_name": "Same Person", "tabs": [], "sections": [], "parser_version": "0.6.0", "_html": "", } def _parsed_employee_with_publication(profile_id: str) -> dict: parsed = _parsed_employee(profile_id) parsed["sections"] = [ { "type": "publications", "publications": [ { "id": "888959076", "publication_id": "888959076", "title": "Detailed Publication", "year": 2023, "publication_type": "ARTICLE", "language": "ru", "status": 1, "url": "https://publications.hse.ru/view/888959076", "doi_url": "https://doi.org/10.1/test", "citation_text": "Detailed citation", "annotation": {"ru": "Аннотация"}, "description": {"main": "Detailed citation"}, "authors": [{"id": "1", "title_ru": "Автор"}], "raw_data": {"id": "888959076", "title": "Detailed Publication"}, } ], } ] return parsed