feat: add dataset checkpoint sync for MCP
This commit is contained in:
227
app/services/dataset_versions.py
Normal file
227
app/services/dataset_versions.py
Normal file
@@ -0,0 +1,227 @@
|
||||
import hashlib
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import desc, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models import DatasetVersion, DatasetVersionItem, Employee
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EmployeeMarker:
|
||||
profile_key: str
|
||||
employee_id: int | None
|
||||
status: str
|
||||
checksum: str
|
||||
|
||||
|
||||
def get_or_create_current_version(db: Session, *, crawl_run_id: int | None = None) -> DatasetVersion:
|
||||
employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all()
|
||||
markers = [_employee_marker(employee) for employee in employees]
|
||||
dataset_hash = _dataset_hash(markers)
|
||||
latest = get_latest_version(db)
|
||||
if latest and latest.hash == dataset_hash:
|
||||
return latest
|
||||
|
||||
active_count = sum(1 for marker in markers if marker.status == "active")
|
||||
dismissed_count = sum(1 for marker in markers if marker.status == "dismissed")
|
||||
version = DatasetVersion(
|
||||
hash=dataset_hash,
|
||||
previous_hash=latest.hash if latest else None,
|
||||
crawl_run_id=crawl_run_id,
|
||||
employee_count=len(markers),
|
||||
active_count=active_count,
|
||||
dismissed_count=dismissed_count,
|
||||
)
|
||||
db.add(version)
|
||||
db.flush()
|
||||
for marker in markers:
|
||||
db.add(
|
||||
DatasetVersionItem(
|
||||
dataset_version_id=version.id,
|
||||
profile_key=marker.profile_key,
|
||||
employee_id=marker.employee_id,
|
||||
status=marker.status,
|
||||
checksum=marker.checksum,
|
||||
)
|
||||
)
|
||||
db.flush()
|
||||
return version
|
||||
|
||||
|
||||
def get_latest_version(db: Session) -> DatasetVersion | None:
|
||||
return db.scalar(select(DatasetVersion).order_by(desc(DatasetVersion.created_at), desc(DatasetVersion.id)).limit(1))
|
||||
|
||||
|
||||
def get_version_by_hash(db: Session, dataset_hash: str | None) -> DatasetVersion | None:
|
||||
if not dataset_hash:
|
||||
return None
|
||||
return db.scalar(select(DatasetVersion).where(DatasetVersion.hash == dataset_hash).limit(1))
|
||||
|
||||
|
||||
def service_info_payload(db: Session, *, tools: list[dict], service_name: str, backend_version: str, protocol_version: str) -> dict:
|
||||
version = get_or_create_current_version(db)
|
||||
db.commit()
|
||||
return {
|
||||
"service_name": service_name,
|
||||
"backend_version": backend_version,
|
||||
"protocolVersion": protocol_version,
|
||||
"tools": tools,
|
||||
"dataset": _version_payload(version),
|
||||
}
|
||||
|
||||
|
||||
def sync_employees_payload(db: Session, *, client_hash: str | None = None, include_data: bool = True) -> dict:
|
||||
current = get_or_create_current_version(db)
|
||||
db.commit()
|
||||
if not client_hash:
|
||||
return _full_sync_payload(db, current, include_data=include_data, reason=None)
|
||||
if client_hash == current.hash:
|
||||
return {
|
||||
"mode": "delta",
|
||||
"from_hash": client_hash,
|
||||
"to_hash": current.hash,
|
||||
"dataset": _version_payload(current),
|
||||
"changes": {"added": [], "updated": [], "dismissed": [], "removed": []},
|
||||
}
|
||||
|
||||
previous = get_version_by_hash(db, client_hash)
|
||||
if not previous:
|
||||
return _full_sync_payload(db, current, include_data=include_data, reason="unknown_client_hash", from_hash=client_hash)
|
||||
|
||||
return _delta_sync_payload(db, previous, current, include_data=include_data)
|
||||
|
||||
|
||||
def _full_sync_payload(
|
||||
db: Session,
|
||||
current: DatasetVersion,
|
||||
*,
|
||||
include_data: bool,
|
||||
reason: str | None,
|
||||
from_hash: str | None = None,
|
||||
) -> dict:
|
||||
employees = db.scalars(select(Employee).order_by(Employee.profile_key)).all()
|
||||
payload = {
|
||||
"mode": "full",
|
||||
"from_hash": from_hash,
|
||||
"to_hash": current.hash,
|
||||
"dataset": _version_payload(current),
|
||||
"items": [_employee_payload(employee, include_data=include_data) for employee in employees],
|
||||
}
|
||||
if reason:
|
||||
payload["reason"] = reason
|
||||
return payload
|
||||
|
||||
|
||||
def _delta_sync_payload(db: Session, previous: DatasetVersion, current: DatasetVersion, *, include_data: bool) -> dict:
|
||||
previous_items = _items_by_profile_key(previous)
|
||||
current_items = _items_by_profile_key(current)
|
||||
employees = {employee.profile_key: employee for employee in db.scalars(select(Employee)).all()}
|
||||
added = []
|
||||
updated = []
|
||||
dismissed = []
|
||||
removed = []
|
||||
|
||||
for profile_key, current_item in sorted(current_items.items()):
|
||||
previous_item = previous_items.get(profile_key)
|
||||
employee = employees.get(profile_key)
|
||||
if not previous_item:
|
||||
if employee:
|
||||
added.append(_employee_payload(employee, include_data=include_data))
|
||||
continue
|
||||
if previous_item.checksum == current_item.checksum and previous_item.status == current_item.status:
|
||||
continue
|
||||
if current_item.status == "dismissed":
|
||||
dismissed.append(_tombstone(profile_key, current_item.status, employee))
|
||||
elif employee:
|
||||
updated.append(_employee_payload(employee, include_data=include_data))
|
||||
|
||||
for profile_key, previous_item in sorted(previous_items.items()):
|
||||
if profile_key not in current_items:
|
||||
removed.append(_tombstone(profile_key, "removed", employees.get(profile_key), checksum=previous_item.checksum))
|
||||
|
||||
return {
|
||||
"mode": "delta",
|
||||
"from_hash": previous.hash,
|
||||
"to_hash": current.hash,
|
||||
"dataset": _version_payload(current),
|
||||
"changes": {
|
||||
"added": added,
|
||||
"updated": updated,
|
||||
"dismissed": dismissed,
|
||||
"removed": removed,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _items_by_profile_key(version: DatasetVersion) -> dict[str, DatasetVersionItem]:
|
||||
return {item.profile_key: item for item in version.items}
|
||||
|
||||
|
||||
def _version_payload(version: DatasetVersion) -> dict:
|
||||
return {
|
||||
"hash": version.hash,
|
||||
"previous_hash": version.previous_hash,
|
||||
"created_at": version.created_at.isoformat() if version.created_at else None,
|
||||
"crawl_run_id": version.crawl_run_id,
|
||||
"employee_count": version.employee_count,
|
||||
"active_count": version.active_count,
|
||||
"dismissed_count": version.dismissed_count,
|
||||
}
|
||||
|
||||
|
||||
def _employee_marker(employee: Employee) -> EmployeeMarker:
|
||||
return EmployeeMarker(
|
||||
profile_key=employee.profile_key,
|
||||
employee_id=employee.id,
|
||||
status=employee.status,
|
||||
checksum=employee.current_checksum or _payload_hash(employee.current_data or {}),
|
||||
)
|
||||
|
||||
|
||||
def _dataset_hash(markers: list[EmployeeMarker]) -> str:
|
||||
payload = [
|
||||
{"profile_key": marker.profile_key, "status": marker.status, "checksum": marker.checksum}
|
||||
for marker in sorted(markers, key=lambda item: item.profile_key)
|
||||
]
|
||||
return _payload_hash(payload)
|
||||
|
||||
|
||||
def _payload_hash(value: object) -> str:
|
||||
payload = json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str)
|
||||
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def _employee_payload(employee: Employee, *, include_data: bool) -> dict:
|
||||
payload = {
|
||||
"profile_key": employee.profile_key,
|
||||
"profile_id": employee.profile_id,
|
||||
"full_name": employee.full_name,
|
||||
"status": employee.status,
|
||||
"canonical_url": employee.canonical_url,
|
||||
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
|
||||
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
|
||||
"checksum": employee.current_checksum or _payload_hash(employee.current_data or {}),
|
||||
}
|
||||
if include_data:
|
||||
payload["data"] = employee.current_data
|
||||
return payload
|
||||
|
||||
|
||||
def _tombstone(profile_key: str, status: str, employee: Employee | None, *, checksum: str | None = None) -> dict:
|
||||
payload = {
|
||||
"profile_key": profile_key,
|
||||
"status": status,
|
||||
"checksum": checksum or (employee.current_checksum if employee else None),
|
||||
}
|
||||
if employee:
|
||||
payload.update(
|
||||
{
|
||||
"profile_id": employee.profile_id,
|
||||
"full_name": employee.full_name,
|
||||
"canonical_url": employee.canonical_url,
|
||||
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
|
||||
}
|
||||
)
|
||||
return payload
|
||||
Reference in New Issue
Block a user