import hashlib import json from dataclasses import dataclass from sqlalchemy import desc, select from sqlalchemy.orm import Session from app.models import DatasetVersion, DatasetVersionItem, Employee @dataclass(frozen=True) class EmployeeMarker: profile_key: str employee_id: int | None status: str checksum: str def get_or_create_current_version(db: Session, *, crawl_run_id: int | None = None) -> DatasetVersion: employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all() markers = [_employee_marker(employee) for employee in employees] dataset_hash = _dataset_hash(markers) latest = get_latest_version(db) if latest and latest.hash == dataset_hash: return latest active_count = sum(1 for marker in markers if marker.status == "active") dismissed_count = sum(1 for marker in markers if marker.status == "dismissed") version = DatasetVersion( hash=dataset_hash, previous_hash=latest.hash if latest else None, crawl_run_id=crawl_run_id, employee_count=len(markers), active_count=active_count, dismissed_count=dismissed_count, ) db.add(version) db.flush() for marker in markers: db.add( DatasetVersionItem( dataset_version_id=version.id, profile_key=marker.profile_key, employee_id=marker.employee_id, status=marker.status, checksum=marker.checksum, ) ) db.flush() return version def get_latest_version(db: Session) -> DatasetVersion | None: return db.scalar(select(DatasetVersion).order_by(desc(DatasetVersion.created_at), desc(DatasetVersion.id)).limit(1)) def get_version_by_hash(db: Session, dataset_hash: str | None) -> DatasetVersion | None: if not dataset_hash: return None return db.scalar(select(DatasetVersion).where(DatasetVersion.hash == dataset_hash).limit(1)) def service_info_payload(db: Session, *, tools: list[dict], service_name: str, backend_version: str, protocol_version: str) -> dict: version = get_or_create_current_version(db) db.commit() return { "service_name": service_name, "backend_version": backend_version, "protocolVersion": protocol_version, "tools": tools, "dataset": _version_payload(version), } def sync_employees_payload(db: Session, *, client_hash: str | None = None, include_data: bool = True) -> dict: current = get_or_create_current_version(db) db.commit() if not client_hash: return _full_sync_payload(db, current, include_data=include_data, reason=None) if client_hash == current.hash: return { "mode": "delta", "from_hash": client_hash, "to_hash": current.hash, "dataset": _version_payload(current), "changes": {"added": [], "updated": [], "dismissed": [], "removed": []}, } previous = get_version_by_hash(db, client_hash) if not previous: return _full_sync_payload(db, current, include_data=include_data, reason="unknown_client_hash", from_hash=client_hash) return _delta_sync_payload(db, previous, current, include_data=include_data) def _full_sync_payload( db: Session, current: DatasetVersion, *, include_data: bool, reason: str | None, from_hash: str | None = None, ) -> dict: employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all() payload = { "mode": "full", "from_hash": from_hash, "to_hash": current.hash, "dataset": _version_payload(current), "items": [_employee_payload(employee, include_data=include_data) for employee in employees], } if reason: payload["reason"] = reason return payload def _delta_sync_payload(db: Session, previous: DatasetVersion, current: DatasetVersion, *, include_data: bool) -> dict: previous_items = _items_by_profile_key(previous) current_items = _items_by_profile_key(current) employees = {employee.profile_key: employee for employee in db.scalars(select(Employee)).all()} added = [] updated = [] dismissed = [] removed = [] for profile_key, current_item in sorted(current_items.items()): previous_item = previous_items.get(profile_key) employee = employees.get(profile_key) if not previous_item: if employee: added.append(_employee_payload(employee, include_data=include_data)) continue if previous_item.checksum == current_item.checksum and previous_item.status == current_item.status: continue if current_item.status == "dismissed": dismissed.append(_tombstone(profile_key, current_item.status, employee)) elif employee: updated.append(_employee_payload(employee, include_data=include_data)) for profile_key, previous_item in sorted(previous_items.items()): if profile_key not in current_items: removed.append(_tombstone(profile_key, "removed", employees.get(profile_key), checksum=previous_item.checksum)) return { "mode": "delta", "from_hash": previous.hash, "to_hash": current.hash, "dataset": _version_payload(current), "changes": { "added": added, "updated": updated, "dismissed": dismissed, "removed": removed, }, } def _items_by_profile_key(version: DatasetVersion) -> dict[str, DatasetVersionItem]: return {item.profile_key: item for item in version.items} def _version_payload(version: DatasetVersion) -> dict: return { "hash": version.hash, "previous_hash": version.previous_hash, "created_at": version.created_at.isoformat() if version.created_at else None, "crawl_run_id": version.crawl_run_id, "employee_count": version.employee_count, "active_count": version.active_count, "dismissed_count": version.dismissed_count, } def _employee_marker(employee: Employee) -> EmployeeMarker: return EmployeeMarker( profile_key=employee.profile_key, employee_id=employee.id, status=employee.status, checksum=employee.current_checksum or _payload_hash(employee.current_data or {}), ) def _dataset_hash(markers: list[EmployeeMarker]) -> str: payload = [ {"profile_key": marker.profile_key, "status": marker.status, "checksum": marker.checksum} for marker in sorted(markers, key=lambda item: item.profile_key) ] return _payload_hash(payload) def _payload_hash(value: object) -> str: payload = json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def _employee_payload(employee: Employee, *, include_data: bool) -> dict: payload = { "profile_key": employee.profile_key, "profile_id": employee.profile_id, "full_name": employee.full_name, "status": employee.status, "canonical_url": employee.canonical_url, "last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None, "dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None, "checksum": employee.current_checksum or _payload_hash(employee.current_data or {}), } if include_data: payload["data"] = employee.current_data return payload def _tombstone(profile_key: str, status: str, employee: Employee | None, *, checksum: str | None = None) -> dict: payload = { "profile_key": profile_key, "status": status, "checksum": checksum or (employee.current_checksum if employee else None), } if employee: payload.update( { "profile_id": employee.profile_id, "full_name": employee.full_name, "canonical_url": employee.canonical_url, "dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None, } ) return payload