feat: track crawl run employee changes and verify dismissals

This commit is contained in:
Anton
2026-05-06 15:13:15 +03:00
parent 2331c7a28d
commit d0459a2c30
16 changed files with 517 additions and 27 deletions

View File

@@ -8,7 +8,7 @@ from zoneinfo import ZoneInfo
from sqlalchemy import Select, Text, and_, desc, func, or_, select
from sqlalchemy.orm import Session
from app.models import CrawlRun, Employee
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee
EMPLOYEE_SORTS = {
"full_name": Employee.full_name,
@@ -175,6 +175,26 @@ def run_payload(run: CrawlRun | None) -> dict[str, Any] | None:
}
def run_detail_payload(db: Session, run: CrawlRun | None) -> dict[str, Any] | None:
if not run:
return None
changes = db.scalars(
select(CrawlRunEmployeeChange)
.where(CrawlRunEmployeeChange.crawl_run_id == run.id)
.order_by(CrawlRunEmployeeChange.created_at, CrawlRunEmployeeChange.id)
).all()
errors = db.scalars(select(CrawlError).where(CrawlError.crawl_run_id == run.id).order_by(CrawlError.created_at)).all()
grouped_changes = {"new": [], "missing_from_source": [], "dismissed": []}
for change in changes:
grouped_changes.setdefault(change.change_type, []).append(_change_payload(change))
return {
**(run_payload(run) or {}),
"changes_detail_available": bool(changes),
"changes": grouped_changes,
"errors": [_crawl_error_payload(error) for error in errors],
}
def format_admin_datetime(value: Any) -> str:
if not value:
return "Не указано"
@@ -200,6 +220,52 @@ def _run_status_display(status: str | None) -> str:
return labels.get(status or "", status or "Не указано")
def _change_payload(change: CrawlRunEmployeeChange) -> dict[str, Any]:
return {
"id": change.id,
"employee_id": change.employee_id,
"profile_key": change.profile_key,
"profile_url": change.profile_url,
"full_name": change.full_name,
"change_type": change.change_type,
"change_type_display": _change_type_display(change.change_type),
"profile_available": change.profile_available,
"profile_available_display": _profile_available_display(change.profile_available),
"message": change.message,
"created_at": change.created_at.isoformat() if change.created_at else None,
"created_display": format_admin_datetime(change.created_at),
}
def _crawl_error_payload(error: CrawlError) -> dict[str, Any]:
return {
"id": error.id,
"crawl_run_id": error.crawl_run_id,
"profile_url": error.profile_url,
"error_type": error.error_type,
"message": error.message,
"created_at": error.created_at.isoformat() if error.created_at else None,
"created_display": format_admin_datetime(error.created_at),
}
def _change_type_display(change_type: str | None) -> str:
labels = {
"new": "Новый",
"missing_from_source": "Потеряшка",
"dismissed": "Уволен",
}
return labels.get(change_type or "", change_type or "Не указано")
def _profile_available_display(value: bool | None) -> str:
if value is True:
return "Профиль доступен"
if value is False:
return "Профиль недоступен"
return "Не проверялось"
def _count_section_items(sections: list[dict[str, Any]], section_type: str) -> int:
total = 0
for section in sections:

View File

@@ -9,7 +9,7 @@ from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models import CrawlError, CrawlRun, Employee, EmployeeSnapshot, ParserSource, ProfileTab
from app.models import CrawlError, CrawlRun, CrawlRunEmployeeChange, Employee, EmployeeSnapshot, ParserSource, ProfileTab
from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key
@@ -68,7 +68,7 @@ def run_crawl(db: Session, settings: Settings) -> CrawlRun:
finally:
time.sleep(settings.request_delay_seconds)
run.dismissed_count = _mark_dismissed(db, found_keys)
run.dismissed_count = _mark_dismissed(db, run, found_keys, session, settings.request_timeout)
run.status = "completed"
except Exception as exc:
run.status = "failed"
@@ -107,6 +107,9 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
)
db.add(employee)
run.new_count += 1
is_new = True
else:
is_new = False
employee.full_name = parsed.get("full_name")
employee.status = "active"
@@ -117,6 +120,16 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
employee.current_checksum = checksum
db.flush()
if is_new:
_record_employee_change(
db,
run,
employee,
"new",
profile_available=True,
message="Сотрудник впервые найден в источнике.",
)
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
for tab in parsed.get("tabs") or []:
db.add(
@@ -141,20 +154,70 @@ def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
return employee
def _mark_dismissed(db: Session, found_keys: set[str]) -> int:
def _mark_dismissed(db: Session, run: CrawlRun, found_keys: set[str], session: requests.Session, timeout: int) -> int:
dismissed = 0
active = db.scalars(select(Employee).where(Employee.status == "active")).all()
now = datetime.now(timezone.utc)
for employee in active:
if employee.profile_key in found_keys:
continue
profile_available = _profile_is_available(session, employee.canonical_url, timeout)
if profile_available:
_record_employee_change(
db,
run,
employee,
"missing_from_source",
profile_available=True,
message="Профиль доступен, но ссылка отсутствует в исходном списке.",
)
continue
employee.status = "dismissed"
employee.dismissed_at = now
_record_employee_change(
db,
run,
employee,
"dismissed",
profile_available=False,
message="Сотрудник отсутствует в исходном списке, профиль не подтвердился как доступный.",
)
dismissed += 1
db.commit()
return dismissed
def _profile_is_available(session: requests.Session, url: str, timeout: int) -> bool:
try:
response = session.get(url, headers=HEADERS, timeout=timeout, allow_redirects=True)
return response.status_code < 400
except requests.RequestException:
return False
def _record_employee_change(
db: Session,
run: CrawlRun,
employee: Employee,
change_type: str,
*,
profile_available: bool | None,
message: str,
) -> None:
db.add(
CrawlRunEmployeeChange(
crawl_run_id=run.id,
employee_id=employee.id,
profile_key=employee.profile_key,
profile_url=employee.canonical_url,
full_name=employee.full_name,
change_type=change_type,
profile_available=profile_available,
message=message,
)
)
def _checksum(data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()