feat: adds crawl resource cache
This commit is contained in:
@@ -153,7 +153,7 @@ def stats_payload(db: Session) -> dict[str, Any]:
|
||||
def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
|
||||
if not run:
|
||||
return None
|
||||
processed = run.parsed_count + run.error_count
|
||||
processed = run.parsed_count + run.skipped_count + run.error_count
|
||||
percent = round((processed / run.found_count) * 100, 1) if run.found_count else 0
|
||||
return {
|
||||
"id": run.id,
|
||||
@@ -166,6 +166,7 @@ def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
|
||||
"finished_display": format_admin_datetime(run.finished_at),
|
||||
"found_count": run.found_count,
|
||||
"parsed_count": run.parsed_count,
|
||||
"skipped_count": run.skipped_count,
|
||||
"new_count": run.new_count,
|
||||
"error_count": run.error_count,
|
||||
"dismissed_count": run.dismissed_count,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import gzip
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
@@ -14,6 +15,7 @@ from app.parser.collector import collect_profile_links
|
||||
from app.parser.profile import parse_person_profile
|
||||
from app.parser.profile_url import profile_key
|
||||
from app.services.dataset_versions import get_or_create_current_version
|
||||
from app.services.resource_cache import ResourceCache
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)"
|
||||
@@ -29,8 +31,10 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
|
||||
|
||||
found_keys: set[str] = set()
|
||||
parsed_count = 0
|
||||
skipped_count = 0
|
||||
try:
|
||||
with requests.Session() as session:
|
||||
resource_cache = ResourceCache(db)
|
||||
urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout)
|
||||
if settings.crawl_limit:
|
||||
urls = urls[: settings.crawl_limit]
|
||||
@@ -48,12 +52,17 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
|
||||
HEADERS,
|
||||
settings.request_timeout,
|
||||
settings.parser_use_playwright,
|
||||
resource_cache=resource_cache,
|
||||
)
|
||||
if not parsed:
|
||||
continue
|
||||
_upsert_employee(db, run, parsed)
|
||||
parsed_count += 1
|
||||
_, changed = _upsert_employee(db, run, parsed)
|
||||
if changed:
|
||||
parsed_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
run.parsed_count = parsed_count
|
||||
run.skipped_count = skipped_count
|
||||
db.commit()
|
||||
except Exception as exc:
|
||||
run.error_count += 1
|
||||
@@ -69,7 +78,7 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
|
||||
finally:
|
||||
time.sleep(settings.request_delay_seconds)
|
||||
|
||||
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
|
||||
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
|
||||
run.status = "completed"
|
||||
get_or_create_current_version(db, crawl_run_id=run.id)
|
||||
except Exception as exc:
|
||||
@@ -90,20 +99,25 @@ def refresh_employee(db: Session, employee: Employee, settings: Settings) -> Cra
|
||||
|
||||
try:
|
||||
with requests.Session() as session:
|
||||
resource_cache = ResourceCache(db)
|
||||
parsed = parse_person_profile(
|
||||
session,
|
||||
employee.canonical_url,
|
||||
HEADERS,
|
||||
settings.request_timeout,
|
||||
settings.parser_use_playwright,
|
||||
resource_cache=resource_cache,
|
||||
)
|
||||
if not parsed:
|
||||
raise ValueError("Профиль не удалось распарсить.")
|
||||
if _parsed_profile_key(parsed) != employee.profile_key:
|
||||
raise ValueError("Распарсенный профиль не совпадает с обновляемым сотрудником.")
|
||||
|
||||
_upsert_employee(db, run, parsed)
|
||||
run.parsed_count = 1
|
||||
_, changed = _upsert_employee(db, run, parsed)
|
||||
if changed:
|
||||
run.parsed_count = 1
|
||||
else:
|
||||
run.skipped_count = 1
|
||||
run.status = "completed"
|
||||
get_or_create_current_version(db, crawl_run_id=run.id)
|
||||
except Exception as exc:
|
||||
@@ -140,8 +154,9 @@ def _parsed_profile_key(parsed: dict) -> str:
|
||||
return f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
|
||||
|
||||
|
||||
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
|
||||
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> tuple[Employee, bool]:
|
||||
html = parsed.pop("_html", None)
|
||||
parsed.pop("_resource_manifest", None)
|
||||
checksum = _checksum(parsed)
|
||||
key = _parsed_profile_key(parsed)
|
||||
employee = db.scalar(select(Employee).where(Employee.profile_key == key))
|
||||
@@ -160,12 +175,15 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
|
||||
else:
|
||||
is_new = False
|
||||
|
||||
parser_version = parsed.get("parser_version")
|
||||
changed = is_new or employee.current_checksum != checksum or employee.parser_version != parser_version
|
||||
employee.full_name = parsed.get("full_name")
|
||||
employee.status = "active"
|
||||
employee.last_seen_at = now
|
||||
employee.dismissed_at = None
|
||||
employee.parser_version = parsed.get("parser_version")
|
||||
employee.current_data = parsed
|
||||
employee.parser_version = parser_version
|
||||
if changed:
|
||||
employee.current_data = parsed
|
||||
employee.current_checksum = checksum
|
||||
db.flush()
|
||||
|
||||
@@ -179,28 +197,29 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
|
||||
message="Сотрудник впервые найден в источнике.",
|
||||
)
|
||||
|
||||
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
|
||||
for tab in parsed.get("tabs") or []:
|
||||
if changed:
|
||||
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
|
||||
for tab in parsed.get("tabs") or []:
|
||||
db.add(
|
||||
ProfileTab(
|
||||
employee_id=employee.id,
|
||||
title=tab.get("title") or "",
|
||||
href=tab.get("href") or "",
|
||||
data_index=tab.get("data_index"),
|
||||
)
|
||||
)
|
||||
|
||||
db.add(
|
||||
ProfileTab(
|
||||
EmployeeSnapshot(
|
||||
employee_id=employee.id,
|
||||
title=tab.get("title") or "",
|
||||
href=tab.get("href") or "",
|
||||
data_index=tab.get("data_index"),
|
||||
crawl_run_id=run.id,
|
||||
parsed_data=parsed,
|
||||
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
|
||||
checksum=checksum,
|
||||
parser_version=parser_version,
|
||||
)
|
||||
)
|
||||
|
||||
db.add(
|
||||
EmployeeSnapshot(
|
||||
employee_id=employee.id,
|
||||
crawl_run_id=run.id,
|
||||
parsed_data=parsed,
|
||||
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
|
||||
checksum=checksum,
|
||||
parser_version=parsed.get("parser_version"),
|
||||
)
|
||||
)
|
||||
return employee
|
||||
return employee, changed
|
||||
|
||||
|
||||
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
|
||||
@@ -268,5 +287,23 @@ def _record_employee_change(
|
||||
|
||||
|
||||
def _checksum(data: dict) -> str:
|
||||
payload = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||||
payload = json.dumps(_stable_checksum_payload(data), ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||||
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def _stable_checksum_payload(value):
|
||||
if isinstance(value, dict):
|
||||
return {key: _stable_checksum_payload(item) for key, item in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [_stable_checksum_payload(item) for item in value]
|
||||
if isinstance(value, str):
|
||||
return _normalize_date_dependent_experience(value)
|
||||
return value
|
||||
|
||||
|
||||
def _normalize_date_dependent_experience(value: str) -> str:
|
||||
return re.sub(
|
||||
r"(?i)(стаж(?:\s+работы)?(?:\s+в\s+ниу\s+вшэ|\s+в\s+вшэ)?\s*:?\s*)\d+\s*(?:год(?:а|ов)?|лет)",
|
||||
r"\1<experience-years>",
|
||||
value,
|
||||
)
|
||||
|
||||
147
app/services/resource_cache.py
Normal file
147
app/services/resource_cache.py
Normal file
@@ -0,0 +1,147 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models import ParseResourceCache
|
||||
from app.version import BACKEND_VERSION
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CachedResource:
|
||||
text: str
|
||||
body_hash: str
|
||||
from_cache: bool
|
||||
status_code: int
|
||||
|
||||
|
||||
class ResourceCache:
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def fetch_text(
|
||||
self,
|
||||
session: requests.Session,
|
||||
*,
|
||||
profile_key: str,
|
||||
resource_key: str,
|
||||
method: str,
|
||||
url: str,
|
||||
headers: dict[str, str],
|
||||
timeout: int,
|
||||
json_payload: Any | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
) -> CachedResource:
|
||||
method = method.upper()
|
||||
fingerprint = _request_fingerprint(method=method, url=url, json_payload=json_payload, params=params)
|
||||
cached = self.db.scalar(
|
||||
select(ParseResourceCache).where(
|
||||
ParseResourceCache.profile_key == profile_key,
|
||||
ParseResourceCache.resource_key == resource_key,
|
||||
ParseResourceCache.request_fingerprint == fingerprint,
|
||||
)
|
||||
)
|
||||
|
||||
request_headers = dict(headers)
|
||||
if cached:
|
||||
if cached.etag:
|
||||
request_headers["If-None-Match"] = cached.etag
|
||||
if cached.last_modified:
|
||||
request_headers["If-Modified-Since"] = cached.last_modified
|
||||
|
||||
response = _send(
|
||||
session,
|
||||
method=method,
|
||||
url=url,
|
||||
headers=request_headers,
|
||||
timeout=timeout,
|
||||
json_payload=json_payload,
|
||||
params=params,
|
||||
)
|
||||
if response.status_code == 304 and cached:
|
||||
cached.fetched_at = datetime.now(timezone.utc)
|
||||
self.db.flush()
|
||||
return CachedResource(
|
||||
text=gzip.decompress(cached.body_snapshot).decode("utf-8"),
|
||||
body_hash=cached.body_hash,
|
||||
from_cache=True,
|
||||
status_code=response.status_code,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
text = response.text
|
||||
body_hash = _body_hash(text)
|
||||
etag = response.headers.get("ETag") if hasattr(response, "headers") else None
|
||||
last_modified = response.headers.get("Last-Modified") if hasattr(response, "headers") else None
|
||||
|
||||
if cached:
|
||||
cached.method = method
|
||||
cached.url = url
|
||||
cached.etag = etag
|
||||
cached.last_modified = last_modified
|
||||
cached.body_hash = body_hash
|
||||
cached.body_snapshot = gzip.compress(text.encode("utf-8"))
|
||||
cached.parser_version = BACKEND_VERSION
|
||||
cached.fetched_at = datetime.now(timezone.utc)
|
||||
else:
|
||||
self.db.add(
|
||||
ParseResourceCache(
|
||||
profile_key=profile_key,
|
||||
resource_key=resource_key,
|
||||
method=method,
|
||||
url=url,
|
||||
request_fingerprint=fingerprint,
|
||||
etag=etag,
|
||||
last_modified=last_modified,
|
||||
body_hash=body_hash,
|
||||
body_snapshot=gzip.compress(text.encode("utf-8")),
|
||||
parser_version=BACKEND_VERSION,
|
||||
fetched_at=datetime.now(timezone.utc),
|
||||
)
|
||||
)
|
||||
self.db.flush()
|
||||
return CachedResource(text=text, body_hash=body_hash, from_cache=False, status_code=response.status_code)
|
||||
|
||||
|
||||
def _send(
|
||||
session: requests.Session,
|
||||
*,
|
||||
method: str,
|
||||
url: str,
|
||||
headers: dict[str, str],
|
||||
timeout: int,
|
||||
json_payload: Any | None,
|
||||
params: dict[str, Any] | None,
|
||||
) -> requests.Response:
|
||||
if method == "POST":
|
||||
return session.post(url, json=json_payload, headers=headers, timeout=timeout, params=params)
|
||||
return session.get(url, headers=headers, timeout=timeout, params=params)
|
||||
|
||||
|
||||
def _request_fingerprint(
|
||||
*,
|
||||
method: str,
|
||||
url: str,
|
||||
json_payload: Any | None,
|
||||
params: dict[str, Any] | None,
|
||||
) -> str:
|
||||
payload = {
|
||||
"method": method,
|
||||
"url": url,
|
||||
"json": json_payload,
|
||||
"params": params,
|
||||
}
|
||||
encoded = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||||
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def _body_hash(text: str) -> str:
|
||||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||||
Reference in New Issue
Block a user