Files
miem_workers/tests/test_crawler.py

384 lines
13 KiB
Python

import gzip
from datetime import datetime, timezone
from app.models import (
CrawlError,
CrawlRun,
CrawlRunEmployeeChange,
Employee,
EmployeeNewsLink,
EmployeePublication,
EmployeeSnapshot,
ParseResourceCache,
)
from app.services.crawler import _checksum, _mark_dismissed, _upsert_employee
from app.services.resource_cache import ResourceCache
class FakeResponse:
def __init__(self, status_code):
self.status_code = status_code
class FakeSession:
def __init__(self, statuses):
self.statuses = statuses
def get(self, url, **_kwargs):
return FakeResponse(self.statuses[url])
class ConditionalResponse:
def __init__(self, status_code, text="", headers=None):
self.status_code = status_code
self._text = text
self.headers = headers or {}
self.text_read = False
@property
def text(self):
self.text_read = True
return self._text
def raise_for_status(self):
return None
class ConditionalSession:
def __init__(self):
self.requests = []
self.not_modified_response = ConditionalResponse(304)
def get(self, url, **kwargs):
self.requests.append((url, kwargs))
if kwargs["headers"].get("If-None-Match") == '"cached"':
return self.not_modified_response
return ConditionalResponse(200, "fresh", {"ETag": '"fresh"'})
def test_mark_dismissed_records_missing_source_when_profile_is_available(db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
db_session.add(
Employee(
profile_key="staff:kept",
canonical_url="https://www.hse.ru/staff/kept",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
)
)
db_session.add(
Employee(
profile_key="staff:missing",
canonical_url="https://www.hse.ru/staff/missing",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
)
)
db_session.commit()
dismissed = _mark_dismissed(
db_session,
run,
{"staff:kept"},
FakeSession({"https://www.hse.ru/staff/missing": 200}),
30,
)
assert dismissed == 0
assert db_session.query(Employee).filter_by(profile_key="staff:kept").one().status == "active"
missing = db_session.query(Employee).filter_by(profile_key="staff:missing").one()
assert missing.status == "active"
assert missing.dismissed_at is None
change = db_session.query(CrawlRunEmployeeChange).one()
assert change.change_type == "missing_from_source"
assert change.profile_available is True
def test_mark_dismissed_marks_missing_employee_when_profile_is_unavailable(db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
employee = Employee(
profile_key="staff:gone",
canonical_url="https://www.hse.ru/staff/gone",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
)
db_session.add_all([run, employee])
db_session.commit()
dismissed = _mark_dismissed(
db_session,
run,
set(),
FakeSession({"https://www.hse.ru/staff/gone": 404}),
30,
)
assert dismissed == 1
assert employee.status == "dismissed"
assert employee.dismissed_at is not None
change = db_session.query(CrawlRunEmployeeChange).one()
assert change.change_type == "dismissed"
assert change.profile_available is False
def test_upsert_employee_increments_new_count_and_records_change_for_new_employee(db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
db_session.commit()
_upsert_employee(
db_session,
run,
{
"source_url": "https://www.hse.ru/staff/newperson",
"profile_type": "staff",
"profile_id": "newperson",
"full_name": "New Person",
"tabs": [],
"sections": [],
"parser_version": "0.2.0",
"_html": "<html></html>",
},
)
db_session.commit()
assert run.new_count == 1
change = db_session.query(CrawlRunEmployeeChange).one()
assert change.change_type == "new"
assert change.full_name == "New Person"
def test_resource_cache_uses_etag_and_reuses_cached_body_on_304(db_session):
db_session.add(
ParseResourceCache(
profile_key="staff:cached",
resource_key="main-html",
method="GET",
url="https://www.hse.ru/staff/cached",
request_fingerprint="020d59db7b358d9023d0f185bcbf5a9c085d3cf2bf91d92d48eee9147e8d0f01",
etag='"cached"',
body_hash="cached-hash",
body_snapshot=gzip.compress("cached body".encode("utf-8")),
parser_version="0.6.0",
)
)
db_session.commit()
session = ConditionalSession()
result = ResourceCache(db_session).fetch_text(
session,
profile_key="staff:cached",
resource_key="main-html",
method="GET",
url="https://www.hse.ru/staff/cached",
headers={"User-Agent": "test"},
timeout=10,
)
assert session.requests[0][1]["headers"]["If-None-Match"] == '"cached"'
assert result.text == "cached body"
assert result.from_cache is True
assert session.not_modified_response.text_read is False
def test_upsert_employee_skips_snapshot_when_checksum_is_unchanged(db_session):
first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add_all([first_run, second_run])
db_session.commit()
_, first_changed = _upsert_employee(db_session, first_run, _parsed_employee("same"))
_, second_changed = _upsert_employee(db_session, second_run, _parsed_employee("same"))
db_session.commit()
assert first_changed is True
assert second_changed is False
assert db_session.query(EmployeeSnapshot).count() == 1
def test_upsert_employee_saves_publications_and_reuses_existing_rows(db_session):
first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add_all([first_run, second_run])
db_session.commit()
parsed = _parsed_employee("published")
parsed["sections"] = [
{
"type": "publications",
"publications": [
{
"id": "888959076",
"publication_id": "888959076",
"title": "Detailed Publication",
"year": 2023,
"publication_type": "ARTICLE",
"language": "ru",
"status": 1,
"url": "https://publications.hse.ru/view/888959076",
"doi_url": "https://doi.org/10.1/test",
"citation_text": "Detailed citation",
"annotation": {"ru": "Аннотация"},
"description": {"main": "Detailed citation"},
"authors": [{"id": "1", "title_ru": "Автор"}],
"raw_data": {"id": "888959076", "title": "Detailed Publication"},
}
],
}
]
employee, _ = _upsert_employee(db_session, first_run, parsed)
db_session.commit()
_upsert_employee(db_session, second_run, _parsed_employee_with_publication("published"))
db_session.commit()
publications = db_session.query(EmployeePublication).filter_by(employee_id=employee.id).all()
assert len(publications) == 1
assert publications[0].doi_url == "https://doi.org/10.1/test"
assert publications[0].authors == [{"id": "1", "title_ru": "Автор"}]
def test_upsert_employee_records_publication_errors_without_failing_employee(monkeypatch, db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
db_session.commit()
def broken_sync(*_args, **_kwargs):
raise RuntimeError("boom")
monkeypatch.setattr("app.services.crawler._sync_employee_publications", broken_sync)
employee, changed = _upsert_employee(db_session, run, _parsed_employee_with_publication("error-safe"))
db_session.commit()
assert changed is True
assert employee.full_name == "Same Person"
assert db_session.query(Employee).filter_by(profile_key="staff:error-safe").one()
error = db_session.query(CrawlError).one()
assert "публикации" in error.message.lower()
def test_upsert_employee_saves_news_links_and_reuses_existing_rows(db_session):
first_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
second_run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add_all([first_run, second_run])
db_session.commit()
employee, _ = _upsert_employee(db_session, first_run, _parsed_employee_with_news("news-person"))
db_session.commit()
_upsert_employee(db_session, second_run, _parsed_employee_with_news("news-person"))
db_session.commit()
news_links = db_session.query(EmployeeNewsLink).filter_by(employee_id=employee.id).all()
assert len(news_links) == 1
assert news_links[0].title == "News Title"
assert news_links[0].url == "https://www.hse.ru/news/1.html"
assert news_links[0].published_year == 2026
def test_upsert_employee_records_news_errors_without_failing_employee(monkeypatch, db_session):
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="running")
db_session.add(run)
db_session.commit()
def broken_sync(*_args, **_kwargs):
raise RuntimeError("boom")
monkeypatch.setattr("app.services.crawler._sync_employee_news_links", broken_sync)
employee, changed = _upsert_employee(db_session, run, _parsed_employee_with_news("news-error-safe"))
db_session.commit()
assert changed is True
assert employee.full_name == "Same Person"
assert db_session.query(Employee).filter_by(profile_key="staff:news-error-safe").one()
error = db_session.query(CrawlError).one()
assert "новости" in error.message.lower()
def test_checksum_changes_when_widget_data_changes():
base = _parsed_employee("widgets")
changed = _parsed_employee("widgets")
changed["sections"] = [
{
"type": "publications",
"publications": [{"id": "1", "title": "New publication"}],
}
]
assert _checksum(base) != _checksum(changed)
def test_checksum_ignores_date_dependent_experience_text():
first = _parsed_employee("experience")
second = _parsed_employee("experience")
first["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 5 лет"}]
second["sections"] = [{"raw_text": "Стаж работы в НИУ ВШЭ: 6 лет"}]
assert _checksum(first) == _checksum(second)
def _parsed_employee(profile_id: str) -> dict:
return {
"source_url": f"https://www.hse.ru/staff/{profile_id}",
"profile_type": "staff",
"profile_id": profile_id,
"full_name": "Same Person",
"tabs": [],
"sections": [],
"parser_version": "0.6.0",
"_html": "<html></html>",
}
def _parsed_employee_with_publication(profile_id: str) -> dict:
parsed = _parsed_employee(profile_id)
parsed["sections"] = [
{
"type": "publications",
"publications": [
{
"id": "888959076",
"publication_id": "888959076",
"title": "Detailed Publication",
"year": 2023,
"publication_type": "ARTICLE",
"language": "ru",
"status": 1,
"url": "https://publications.hse.ru/view/888959076",
"doi_url": "https://doi.org/10.1/test",
"citation_text": "Detailed citation",
"annotation": {"ru": "Аннотация"},
"description": {"main": "Detailed citation"},
"authors": [{"id": "1", "title_ru": "Автор"}],
"raw_data": {"id": "888959076", "title": "Detailed Publication"},
}
],
}
]
return parsed
def _parsed_employee_with_news(profile_id: str) -> dict:
parsed = _parsed_employee(profile_id)
parsed["sections"] = [
{
"type": "news",
"news_links": [
{
"title": "News Title",
"url": "https://www.hse.ru/news/1.html",
"summary": "News summary",
"published_at": "2026-04-28T00:00:00+00:00",
"published_year": 2026,
"raw_data": {"title": "News Title", "url": "https://www.hse.ru/news/1.html"},
}
],
}
]
return parsed