feature: add MIEM employees parser service with admin UI and MCP #1

Merged
admin merged 1 commits from feature/miem-employees-server into main 2026-04-28 13:21:22 +00:00
29 changed files with 1883 additions and 0 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""MIEM employees service."""

123
app/admin.py Normal file
View File

@@ -0,0 +1,123 @@
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import desc, func, or_, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import SessionLocal, get_db
from app.models import CrawlError, CrawlRun, Employee
from app.security import SESSION_COOKIE, require_admin, sign_session, verify_admin
from app.services.crawler import run_crawl
from app.version import BACKEND_VERSION, FRONTEND_VERSION
router = APIRouter(prefix="/admin")
templates = Jinja2Templates(directory="app/templates")
@router.get("", response_class=HTMLResponse)
def dashboard(request: Request, db: Session = Depends(get_db), settings: Settings = Depends(get_settings)):
require_admin(request, settings)
counts = {
"active": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "active")) or 0,
"dismissed": db.scalar(select(func.count()).select_from(Employee).where(Employee.status == "dismissed")) or 0,
"runs": db.scalar(select(func.count()).select_from(CrawlRun)) or 0,
"errors": db.scalar(select(func.count()).select_from(CrawlError)) or 0,
}
runs = db.scalars(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(10)).all()
return _render(request, "dashboard.html", {"counts": counts, "runs": runs})
@router.get("/login", response_class=HTMLResponse)
def login_form(request: Request):
return _render(request, "login.html", {"error": None})
@router.post("/login")
def login(
response: Response,
request: Request,
username: str = Form(...),
password: str = Form(...),
settings: Settings = Depends(get_settings),
):
if not verify_admin(username, password, settings):
return _render(request, "login.html", {"error": "Неверный логин или пароль"}, status_code=401)
redirect = RedirectResponse("/admin", status_code=303)
redirect.set_cookie(SESSION_COOKIE, sign_session(username, settings), httponly=True, samesite="lax")
return redirect
@router.post("/logout")
def logout():
redirect = RedirectResponse("/admin/login", status_code=303)
redirect.delete_cookie(SESSION_COOKIE)
return redirect
@router.get("/employees", response_class=HTMLResponse)
def employees(
request: Request,
status: str | None = None,
q: str | None = None,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
stmt = select(Employee)
if status:
stmt = stmt.where(Employee.status == status)
if q:
pattern = f"%{q}%"
stmt = stmt.where(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
items = db.scalars(stmt.order_by(Employee.full_name).limit(200)).all()
return _render(request, "employees.html", {"employees": items, "status": status or "", "q": q or ""})
@router.get("/employees/{employee_id}", response_class=HTMLResponse)
def employee_detail(
employee_id: int,
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
employee = db.get(Employee, employee_id)
if not employee:
return RedirectResponse("/admin/employees", status_code=303)
snapshots = sorted(employee.snapshots, key=lambda item: item.captured_at, reverse=True)[:20]
return _render(request, "employee_detail.html", {"employee": employee, "snapshots": snapshots})
@router.get("/runs", response_class=HTMLResponse)
def runs(request: Request, db: Session = Depends(get_db), settings: Settings = Depends(get_settings)):
require_admin(request, settings)
items = db.scalars(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(50)).all()
errors = db.scalars(select(CrawlError).order_by(desc(CrawlError.created_at)).limit(50)).all()
return _render(request, "runs.html", {"runs": items, "errors": errors})
@router.post("/runs")
def trigger_run(
request: Request,
background_tasks: BackgroundTasks,
settings: Settings = Depends(get_settings),
):
require_admin(request, settings)
def _crawl() -> None:
with SessionLocal() as db:
run_crawl(db, settings)
background_tasks.add_task(_crawl)
return RedirectResponse("/admin/runs", status_code=303)
def _render(request: Request, template: str, context: dict, status_code: int = 200) -> HTMLResponse:
payload = {
"request": request,
"backend_version": BACKEND_VERSION,
"frontend_version": FRONTEND_VERSION,
**context,
}
return templates.TemplateResponse(template, payload, status_code=status_code)

113
app/api.py Normal file
View File

@@ -0,0 +1,113 @@
from fastapi import APIRouter, BackgroundTasks, Depends, Request
from sqlalchemy import desc, or_, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import SessionLocal, get_db
from app.models import CrawlRun, Employee
from app.security import require_admin
from app.services.crawler import run_crawl
from app.version import BACKEND_VERSION, FRONTEND_VERSION
router = APIRouter(prefix="/api")
@router.get("/health")
def health() -> dict:
return {"status": "ok", "backend_version": BACKEND_VERSION, "frontend_version": FRONTEND_VERSION}
@router.get("/employees")
def list_employees(
request: Request,
status: str | None = None,
q: str | None = None,
limit: int = 50,
offset: int = 0,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_admin(request, settings)
stmt = select(Employee)
if status:
stmt = stmt.where(Employee.status == status)
if q:
pattern = f"%{q}%"
stmt = stmt.where(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
employees = db.scalars(stmt.order_by(Employee.full_name).limit(limit).offset(offset)).all()
return {"items": [_employee_summary(item) for item in employees], "limit": limit, "offset": offset}
@router.get("/employees/{employee_id}")
def get_employee(
employee_id: int,
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_admin(request, settings)
employee = db.get(Employee, employee_id)
if not employee:
return {"error": "not_found"}
return _employee_detail(employee)
@router.get("/crawl-runs")
def list_crawl_runs(
request: Request,
limit: int = 20,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_admin(request, settings)
runs = db.scalars(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(limit)).all()
return {"items": [_run_summary(run) for run in runs]}
@router.post("/crawl-runs")
def trigger_crawl(
request: Request,
background_tasks: BackgroundTasks,
settings: Settings = Depends(get_settings),
) -> dict:
require_admin(request, settings)
def _crawl() -> None:
with SessionLocal() as db:
run_crawl(db, settings)
background_tasks.add_task(_crawl)
return {"status": "scheduled"}
def _employee_summary(employee: Employee) -> dict:
return {
"id": employee.id,
"full_name": employee.full_name,
"status": employee.status,
"canonical_url": employee.canonical_url,
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
}
def _employee_detail(employee: Employee) -> dict:
data = _employee_summary(employee)
data["current_data"] = employee.current_data
data["tabs"] = [{"title": tab.title, "href": tab.href, "data_index": tab.data_index} for tab in employee.tabs]
return data
def _run_summary(run: CrawlRun) -> dict:
return {
"id": run.id,
"source_url": run.source_url,
"status": run.status,
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,
"message": run.message,
}

25
app/config.py Normal file
View File

@@ -0,0 +1,25 @@
from functools import lru_cache
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
database_url: str = "sqlite:///./miem_workers.db"
source_url: str = "https://miem.hse.ru/persons"
crawl_cron: str = "0 3 * * 1"
crawl_limit: int | None = None
request_timeout: int = 30
request_delay_seconds: float = 1.0
parser_use_playwright: bool = False
admin_username: str = "admin"
admin_password: str = "admin"
session_secret: str = Field(default="dev-session-secret", min_length=8)
mcp_token: str = "dev-mcp-token"
@lru_cache
def get_settings() -> Settings:
return Settings()

35
app/db.py Normal file
View File

@@ -0,0 +1,35 @@
from collections.abc import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from app.config import get_settings
class Base(DeclarativeBase):
pass
def _connect_args(database_url: str) -> dict[str, object]:
if database_url.startswith("sqlite"):
return {"check_same_thread": False}
return {}
settings = get_settings()
engine = create_engine(settings.database_url, connect_args=_connect_args(settings.database_url))
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
def init_db() -> None:
import app.models # noqa: F401
Base.metadata.create_all(bind=engine)
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()

24
app/main.py Normal file
View File

@@ -0,0 +1,24 @@
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from app.admin import router as admin_router
from app.api import router as api_router
from app.db import init_db
from app.mcp import router as mcp_router
from app.version import BACKEND_VERSION
app = FastAPI(title="MIEM Employees", version=BACKEND_VERSION)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
app.include_router(api_router)
app.include_router(admin_router)
app.include_router(mcp_router)
@app.on_event("startup")
def startup() -> None:
init_db()
@app.get("/")
def root() -> dict:
return {"service": "miem-employees", "version": BACKEND_VERSION, "admin": "/admin"}

170
app/mcp.py Normal file
View File

@@ -0,0 +1,170 @@
import json
from fastapi import APIRouter, Depends, Request
from sqlalchemy import desc, or_, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import get_db
from app.models import CrawlRun, Employee
from app.security import require_mcp_token
router = APIRouter(prefix="/mcp")
TOOLS = [
{
"name": "search_employees",
"description": "Search MIEM employees by name or profile URL.",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"status": {"type": "string", "enum": ["active", "dismissed"]},
"limit": {"type": "integer", "default": 20},
},
"required": ["query"],
},
},
{
"name": "get_employee",
"description": "Get one employee by profile id, profile key, or canonical URL.",
"inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]},
},
{
"name": "list_employee_publications",
"description": "List publications parsed from an employee profile.",
"inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]},
},
{
"name": "list_employee_courses",
"description": "List teaching courses parsed from an employee profile.",
"inputSchema": {"type": "object", "properties": {"profile_id_or_url": {"type": "string"}}, "required": ["profile_id_or_url"]},
},
{
"name": "get_crawl_status",
"description": "Return the latest crawl run status.",
"inputSchema": {"type": "object", "properties": {}},
},
]
@router.post("")
async def mcp_http(
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_mcp_token(request, settings)
payload = await request.json()
method = payload.get("method")
request_id = payload.get("id")
params = payload.get("params") or {}
try:
if method == "initialize":
result = {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "miem-employees", "version": "0.1.0"},
"capabilities": {"tools": {}},
}
elif method == "tools/list":
result = {"tools": TOOLS}
elif method == "tools/call":
result = _call_tool(db, params.get("name"), params.get("arguments") or {})
else:
return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32601, "message": "Method not found"}}
return {"jsonrpc": "2.0", "id": request_id, "result": result}
except Exception as exc:
return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(exc)}}
def _call_tool(db: Session, name: str, arguments: dict) -> dict:
if name == "search_employees":
return _tool_response(_search_employees(db, arguments))
if name == "get_employee":
employee = _find_employee(db, arguments["profile_id_or_url"])
return _tool_response(_employee_payload(employee) if employee else {"error": "not_found"})
if name == "list_employee_publications":
employee = _find_employee(db, arguments["profile_id_or_url"])
return _tool_response(_collect_section_items(employee, "publications"))
if name == "list_employee_courses":
employee = _find_employee(db, arguments["profile_id_or_url"])
return _tool_response(_collect_section_items(employee, "courses_by_year"))
if name == "get_crawl_status":
run = db.scalar(select(CrawlRun).order_by(desc(CrawlRun.started_at)).limit(1))
return _tool_response(_run_payload(run) if run else {"status": "never_run"})
raise ValueError(f"Unknown tool: {name}")
def _search_employees(db: Session, arguments: dict) -> list[dict]:
query = arguments.get("query", "")
limit = min(int(arguments.get("limit") or 20), 100)
stmt = select(Employee)
if arguments.get("status"):
stmt = stmt.where(Employee.status == arguments["status"])
if query:
pattern = f"%{query}%"
stmt = stmt.where(or_(Employee.full_name.ilike(pattern), Employee.canonical_url.ilike(pattern)))
employees = db.scalars(stmt.order_by(Employee.full_name).limit(limit)).all()
return [_employee_payload(employee, include_data=False) for employee in employees]
def _find_employee(db: Session, value: str) -> Employee | None:
pattern = value.strip()
stmt = select(Employee).where(
or_(
Employee.profile_key == pattern,
Employee.profile_id == pattern,
Employee.canonical_url == pattern,
Employee.canonical_url.ilike(f"%{pattern}%"),
)
)
return db.scalar(stmt.limit(1))
def _collect_section_items(employee: Employee | None, section_type: str) -> dict:
if not employee or not employee.current_data:
return {"items": []}
items = []
for section in employee.current_data.get("sections") or []:
if section.get("type") != section_type:
continue
if section_type == "publications":
items.extend(section.get("publications") or [])
elif section_type == "courses_by_year":
items.extend(section.get("courses") or [])
return {"employee": _employee_payload(employee, include_data=False), "items": items}
def _employee_payload(employee: Employee, include_data: bool = True) -> dict:
payload = {
"profile_key": employee.profile_key,
"profile_id": employee.profile_id,
"full_name": employee.full_name,
"status": employee.status,
"canonical_url": employee.canonical_url,
"last_seen_at": employee.last_seen_at.isoformat() if employee.last_seen_at else None,
"dismissed_at": employee.dismissed_at.isoformat() if employee.dismissed_at else None,
}
if include_data:
payload["data"] = employee.current_data
return payload
def _run_payload(run: CrawlRun) -> dict:
return {
"id": run.id,
"status": run.status,
"source_url": run.source_url,
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"found_count": run.found_count,
"parsed_count": run.parsed_count,
"error_count": run.error_count,
"dismissed_count": run.dismissed_count,
}
def _tool_response(data: object) -> dict:
return {"content": [{"type": "text", "text": json.dumps(data, ensure_ascii=False, default=str)}]}

109
app/models.py Normal file
View File

@@ -0,0 +1,109 @@
from datetime import datetime, timezone
from sqlalchemy import DateTime, ForeignKey, Index, Integer, LargeBinary, String, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.types import JSON
from app.db import Base
def utcnow() -> datetime:
return datetime.now(timezone.utc)
json_type = JSON().with_variant(JSONB, "postgresql")
class Employee(Base):
__tablename__ = "employees"
__table_args__ = (
UniqueConstraint("profile_key", name="uq_employees_profile_key"),
Index("ix_employees_full_name", "full_name"),
Index("ix_employees_status", "status"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
profile_key: Mapped[str] = mapped_column(String(255), nullable=False)
profile_type: Mapped[str | None] = mapped_column(String(50))
profile_id: Mapped[str | None] = mapped_column(String(255))
canonical_url: Mapped[str] = mapped_column(Text, nullable=False)
full_name: Mapped[str | None] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(32), default="active", nullable=False)
first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
dismissed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
parser_version: Mapped[str | None] = mapped_column(String(32))
current_data: Mapped[dict | None] = mapped_column(json_type)
current_checksum: Mapped[str | None] = mapped_column(String(64))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, onupdate=utcnow, nullable=False)
snapshots: Mapped[list["EmployeeSnapshot"]] = relationship(back_populates="employee")
tabs: Mapped[list["ProfileTab"]] = relationship(back_populates="employee", cascade="all, delete-orphan")
class EmployeeSnapshot(Base):
__tablename__ = "employee_snapshots"
__table_args__ = (Index("ix_employee_snapshots_employee_id", "employee_id"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
employee_id: Mapped[int] = mapped_column(ForeignKey("employees.id"), nullable=False)
crawl_run_id: Mapped[int | None] = mapped_column(ForeignKey("crawl_runs.id"))
parsed_data: Mapped[dict] = mapped_column(json_type, nullable=False)
html_snapshot: Mapped[bytes | None] = mapped_column(LargeBinary)
checksum: Mapped[str] = mapped_column(String(64), nullable=False)
parser_version: Mapped[str | None] = mapped_column(String(32))
captured_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
employee: Mapped[Employee] = relationship(back_populates="snapshots")
class CrawlRun(Base):
__tablename__ = "crawl_runs"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
source_url: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[str] = mapped_column(String(32), default="running", nullable=False)
started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
found_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
parsed_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
error_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
dismissed_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
message: Mapped[str | None] = mapped_column(Text)
class CrawlError(Base):
__tablename__ = "crawl_errors"
__table_args__ = (Index("ix_crawl_errors_run_id", "crawl_run_id"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
crawl_run_id: Mapped[int] = mapped_column(ForeignKey("crawl_runs.id"), nullable=False)
profile_url: Mapped[str | None] = mapped_column(Text)
error_type: Mapped[str] = mapped_column(String(255), nullable=False)
message: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
class ProfileTab(Base):
__tablename__ = "profile_tabs"
__table_args__ = (Index("ix_profile_tabs_employee_id", "employee_id"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
employee_id: Mapped[int] = mapped_column(ForeignKey("employees.id"), nullable=False)
title: Mapped[str] = mapped_column(Text, nullable=False)
href: Mapped[str] = mapped_column(Text, nullable=False)
data_index: Mapped[str | None] = mapped_column(String(64))
employee: Mapped[Employee] = relationship(back_populates="tabs")
class ParserSource(Base):
__tablename__ = "parser_sources"
__table_args__ = (UniqueConstraint("source_url", name="uq_parser_sources_source_url"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
source_url: Mapped[str] = mapped_column(Text, nullable=False)
enabled: Mapped[bool] = mapped_column(default=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)

1
app/parser/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""HTML parsing helpers for HSE/MIEM employee pages."""

19
app/parser/collector.py Normal file
View File

@@ -0,0 +1,19 @@
from bs4 import BeautifulSoup
from requests import Session
from app.parser.profile_url import normalize_profile_url
def collect_profile_links(session: Session, source_url: str, headers: dict[str, str], timeout: int) -> list[str]:
response = session.get(source_url, headers=headers, timeout=timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
seen: set[str] = set()
collected: list[str] = []
for anchor in soup.find_all("a", href=True):
normalized = normalize_profile_url(anchor["href"])
if normalized and normalized not in seen:
seen.add(normalized)
collected.append(normalized)
return collected

380
app/parser/profile.py Normal file
View File

@@ -0,0 +1,380 @@
import re
from urllib.parse import urljoin
from bs4 import BeautifulSoup, NavigableString, Tag
from requests import Session
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
from app.version import BACKEND_VERSION
_YEAR_PATTERN = re.compile(r"Начал[аи]?\s+работать.*?ВШЭ.*?(\d{4})", re.IGNORECASE)
_EMAIL_PATTERN = re.compile(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})")
_PHONE_PATTERN = re.compile(r"(?:Телефон|Phone)\s*:\s*([+()\d\-\s]{8,})", re.IGNORECASE)
def normalize_ws(value: str | None) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def extract_person_tabs(soup: BeautifulSoup, source_url: str) -> list[dict[str, str | None]]:
selectors = (
"div.person-menu.is-desktop.small.person-menu-addition",
".person-menu",
)
for selector in selectors:
menu = soup.select_one(selector)
if not menu:
continue
tabs = []
for anchor in menu.select("a[href]"):
title = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if title and href:
tabs.append(
{
"data_index": anchor.get("data-index"),
"title": title,
"href": urljoin(source_url, href),
}
)
if tabs:
return _dedupe_tabs(tabs)
return []
def extract_person_header(soup: BeautifulSoup, source_url: str) -> dict:
name_node = soup.select_one("h1.person-caption") or soup.find("h1")
text = normalize_ws(soup.get_text(" ", strip=True))
year_match = _YEAR_PATTERN.search(text)
contacts = {"phones": [], "emails": [], "address": None, "items": []}
for email in _EMAIL_PATTERN.findall(text):
if email not in contacts["emails"]:
contacts["emails"].append(email)
for phone in _PHONE_PATTERN.findall(text):
normalized_phone = normalize_ws(phone)
if normalized_phone and normalized_phone not in contacts["phones"]:
contacts["phones"].append(normalized_phone)
address_match = re.search(
r"(Адрес[:\s].{0,220}?)(?:\s+Время|\s+Расписание|\s+SPIN|\s+ORCID|$)",
text,
flags=re.IGNORECASE,
)
if address_match:
contacts["address"] = normalize_ws(address_match.group(1)).rstrip(",")
positions = []
for li in soup.select("ul.g-ul.g-list.small.employment-add li, ul.employment-add li"):
value = normalize_ws(li.get_text(" ", strip=True))
if value:
positions.append(value)
external_ids = []
id_domains = (
("ORCID", "orcid.org"),
("Scopus AuthorID", "scopus.com"),
("ResearcherID", "webofscience.com"),
("Google Scholar", "scholar.google."),
("SPIN РИНЦ", "elibrary.ru"),
)
for anchor in soup.select("a[href]"):
href = anchor.get("href", "").strip()
label = normalize_ws(anchor.get_text(" ", strip=True))
for system, marker in id_domains:
if marker in href:
external_ids.append({"system": system, "value": label or system, "url": href})
break
return {
"source_url": source_url,
"full_name": normalize_ws(name_node.get_text(" ", strip=True)) if name_node else None,
"positions": positions,
"hse_start_year": int(year_match.group(1)) if year_match else None,
"contacts": contacts,
"external_ids": _dedupe_dicts(external_ids),
}
def extract_sections(soup: BeautifulSoup, source_url: str) -> list[dict]:
sections = []
for h2 in soup.select("h2"):
title = normalize_ws(h2.get_text(" ", strip=True))
if not title or "расписание занятий" in title.lower():
continue
nodes = _collect_between_h2(h2)
raw_text = _nodes_raw_text(nodes)
paragraphs = _nodes_paragraphs(nodes)
items = _nodes_list_items(nodes)
links = []
for node in nodes:
if isinstance(node, Tag):
links.extend(_extract_links(node, source_url))
section_type = _infer_section_type(title, nodes)
section = {
"title": title,
"slug": _slugify(title),
"type": section_type,
"raw_text": raw_text,
"paragraphs": paragraphs,
"items": items,
"links": links,
}
if section_type == "publications":
section["publications_count"], section["publications"] = _parse_publications(title, nodes, source_url)
section["items"] = [item["text"] for item in section["publications"] if item.get("text")]
elif section_type == "courses_by_year":
section["academic_year"], section["courses"] = _parse_courses(title, nodes, source_url)
section.pop("items", None)
section.pop("links", None)
elif section_type == "table":
section["table"] = _parse_table(nodes, source_url)
elif "выпускные квалификационные работы студентов ниу вшэ" in title.lower():
section["items"] = _parse_vkr_items(nodes)
year_entries = _parse_year_entries(nodes, source_url)
if year_entries:
section["year_entries"] = year_entries
if section_type in {"generic", "paragraphs"}:
section["type"] = "year_blocks"
sections.append(section)
return sections
def parse_person_profile(
session: Session,
source_url: str,
headers: dict[str, str],
timeout: int,
use_playwright: bool = False,
) -> dict | None:
normalized_url = normalize_profile_url(source_url)
if not normalized_url:
return None
response = session.get(normalized_url, headers=headers, timeout=timeout)
response.raise_for_status()
html = response.text
if use_playwright:
html = _render_with_playwright(normalized_url, html)
soup = BeautifulSoup(html, "html.parser")
profile_type, profile_id = parse_profile_identity(normalized_url)
header = extract_person_header(soup, normalized_url)
tabs = extract_person_tabs(soup, normalized_url)
sections = extract_sections(soup, normalized_url)
internal_links = [tab["href"] for tab in tabs if tab.get("href")]
return {
"source_url": normalized_url,
"profile_type": profile_type,
"profile_id": profile_id,
"full_name": header.get("full_name"),
"positions": header.get("positions") or [],
"hse_start_year": header.get("hse_start_year"),
"contacts": header.get("contacts") or {},
"external_ids": header.get("external_ids") or [],
"tabs": tabs,
"sections": sections,
"employee_internal_links": internal_links,
"parser_version": BACKEND_VERSION,
"_html": html,
}
def _render_with_playwright(source_url: str, fallback_html: str) -> str:
try:
from playwright.sync_api import sync_playwright
except Exception:
return fallback_html
try:
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=True)
page = browser.new_page()
page.goto(source_url, wait_until="domcontentloaded", timeout=45000)
for index in range(page.locator(".person-menu a").count()):
try:
page.locator(".person-menu a").nth(index).click(timeout=2500, force=True)
page.wait_for_timeout(450)
except Exception:
continue
html = page.content()
browser.close()
return html
except Exception:
return fallback_html
def _collect_between_h2(start_h2: Tag) -> list[Tag | NavigableString | str]:
nodes = []
for sibling in start_h2.next_siblings:
if isinstance(sibling, Tag) and sibling.name == "h2":
break
if isinstance(sibling, NavigableString) and not normalize_ws(str(sibling)):
continue
nodes.append(sibling)
return nodes
def _extract_links(node: Tag, source_url: str) -> list[dict[str, str]]:
links = []
for anchor in node.select("a[href]"):
text = normalize_ws(anchor.get_text(" ", strip=True))
href = anchor.get("href", "").strip()
if text and href and "timetable" not in href.lower() and "расписание" not in text.lower():
links.append({"text": text, "url": urljoin(source_url, href)})
return links
def _nodes_raw_text(nodes: list) -> str:
chunks = []
for node in nodes:
text = normalize_ws(node.get_text(" ", strip=True) if isinstance(node, Tag) else str(node))
if text:
chunks.append(text)
return "\n".join(chunks)
def _nodes_paragraphs(nodes: list) -> list[str]:
paragraphs = []
for node in nodes:
if isinstance(node, Tag):
paragraphs.extend(normalize_ws(p.get_text(" ", strip=True)) for p in node.select("p"))
return [p for p in paragraphs if p]
def _nodes_list_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in items if item and "расписание" not in item.lower()]
def _infer_section_type(title: str, nodes: list) -> str:
lowered = title.lower()
if _has_table(nodes):
return "table"
if "публикац" in lowered:
return "publications"
if "учебные курсы" in lowered:
return "courses_by_year"
if _nodes_list_items(nodes):
return "list"
if _nodes_paragraphs(nodes):
return "paragraphs"
return "generic"
def _has_table(nodes: list) -> bool:
return any(isinstance(node, Tag) and (node.name == "table" or node.find("table")) for node in nodes)
def _parse_table(nodes: list, source_url: str) -> dict:
for node in nodes:
if not isinstance(node, Tag):
continue
table = node if node.name == "table" else node.find("table")
if not table:
continue
headers = [normalize_ws(th.get_text(" ", strip=True)) for th in table.select("th")]
rows = []
for tr in table.select("tr"):
cells = [normalize_ws(td.get_text(" ", strip=True)) for td in tr.select("td")]
if cells:
link = tr.find("a", href=True)
rows.append({"cells": cells, "link_url": urljoin(source_url, link["href"]) if link else None})
return {"headers": headers, "rows": rows}
return {"headers": [], "rows": []}
def _parse_publications(title: str, nodes: list, source_url: str) -> tuple[int | None, list[dict]]:
count_match = re.search(r"(\d+)\s*$", title)
publications = []
for node in nodes:
if not isinstance(node, Tag):
continue
for li in node.select("li"):
text = normalize_ws(li.get_text(" ", strip=True))
anchor = li.find("a", href=True)
if text:
publications.append(
{
"title": normalize_ws(anchor.get_text(" ", strip=True)) if anchor else text,
"url": urljoin(source_url, anchor["href"]) if anchor else None,
"text": text,
}
)
if publications:
break
if not publications:
publications = [{"title": line, "url": None, "text": line} for line in _nodes_raw_text(nodes).split("\n") if line]
return int(count_match.group(1)) if count_match else None, publications
def _parse_courses(title: str, nodes: list, source_url: str) -> tuple[str | None, list[dict]]:
year_match = re.search(r"(\d{4}/\d{4})", title)
courses = []
for node in nodes:
if isinstance(node, Tag):
for li in node.select("li"):
anchor = li.find("a", href=True)
course_title = normalize_ws(anchor.get_text(" ", strip=True) if anchor else li.get_text(" ", strip=True))
if course_title:
courses.append({"title": course_title, "url": urljoin(source_url, anchor["href"]) if anchor else None})
return year_match.group(1) if year_match else None, _dedupe_dicts(courses)
def _parse_year_entries(nodes: list, source_url: str) -> list[dict]:
entries = []
for node in nodes:
if not isinstance(node, Tag):
continue
for year_node in node.select(".person-list-hangover"):
year_match = re.search(r"(19\d{2}|20\d{2})", year_node.get_text(" ", strip=True))
parent = year_node.parent
if parent:
entries.append(
{
"year": int(year_match.group(1)) if year_match else None,
"text": normalize_ws(parent.get_text(" ", strip=True)),
"links": _extract_links(parent, source_url),
}
)
return entries
def _parse_vkr_items(nodes: list) -> list[str]:
items = []
for node in nodes:
if isinstance(node, Tag):
items.extend(normalize_ws(li.get_text(" ", strip=True)) for li in node.select("li"))
return [item for item in dict.fromkeys(items) if item]
def _slugify(value: str) -> str:
cleaned = re.sub(r"[^\w\s-]", "", value.lower(), flags=re.UNICODE)
return re.sub(r"[-\s]+", "_", cleaned).strip("_") or "section"
def _dedupe_tabs(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = (item.get("title"), item.get("href"))
if key not in seen:
seen.add(key)
unique.append(item)
return unique
def _dedupe_dicts(items: list[dict]) -> list[dict]:
seen = set()
unique = []
for item in items:
key = tuple(sorted(item.items()))
if key not in seen:
seen.add(key)
unique.append(item)
return unique

46
app/parser/profile_url.py Normal file
View File

@@ -0,0 +1,46 @@
import re
from urllib.parse import urljoin, urlsplit, urlunsplit
BASE_URL = "https://www.hse.ru"
_ORG_PATTERN = re.compile(r"^/org/persons/(\d+)/?$")
_STAFF_PATTERN = re.compile(r"^/staff/([^/?#]+)/?$")
def normalize_profile_url(href: str | None) -> str | None:
if not href:
return None
candidate = urljoin(BASE_URL + "/", href.strip())
split = urlsplit(candidate)
path = split.path.rstrip("/")
org_match = _ORG_PATTERN.match(path + "/")
if org_match:
return urlunsplit(("https", "www.hse.ru", f"/org/persons/{org_match.group(1)}", "", ""))
staff_match = _STAFF_PATTERN.match(path + "/")
if staff_match:
return urlunsplit(("https", "www.hse.ru", f"/staff/{staff_match.group(1)}", "", ""))
return None
def parse_profile_identity(profile_url: str) -> tuple[str | None, str | None]:
normalized = normalize_profile_url(profile_url)
if not normalized:
return None, None
path = urlsplit(normalized).path.rstrip("/")
org_match = _ORG_PATTERN.match(path + "/")
if org_match:
return "org_person", org_match.group(1)
staff_match = _STAFF_PATTERN.match(path + "/")
if staff_match:
return "staff", staff_match.group(1)
return None, None
def profile_key(profile_url: str) -> str | None:
profile_type, profile_id = parse_profile_identity(profile_url)
if not profile_type or not profile_id:
return None
return f"{profile_type}:{profile_id}"

52
app/security.py Normal file
View File

@@ -0,0 +1,52 @@
import base64
import hashlib
import hmac
import json
import time
from fastapi import HTTPException, Request, status
from app.config import Settings
SESSION_COOKIE = "miem_admin_session"
def verify_admin(username: str, password: str, settings: Settings) -> bool:
return hmac.compare_digest(username, settings.admin_username) and hmac.compare_digest(
password, settings.admin_password
)
def sign_session(username: str, settings: Settings) -> str:
payload = base64.urlsafe_b64encode(
json.dumps({"sub": username, "iat": int(time.time())}, separators=(",", ":")).encode("utf-8")
).decode("ascii")
signature = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
return f"{payload}.{signature}"
def read_session(token: str | None, settings: Settings) -> str | None:
if not token or "." not in token:
return None
payload, signature = token.rsplit(".", 1)
expected = hmac.new(settings.session_secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(signature, expected):
return None
try:
data = json.loads(base64.urlsafe_b64decode(payload.encode("ascii")))
except Exception:
return None
return data.get("sub")
def require_admin(request: Request, settings: Settings) -> str:
username = read_session(request.cookies.get(SESSION_COOKIE), settings)
if not username:
raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/admin/login"})
return username
def require_mcp_token(request: Request, settings: Settings) -> None:
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer ") or not hmac.compare_digest(auth.removeprefix("Bearer ").strip(), settings.mcp_token):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MCP token")

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Application services."""

159
app/services/crawler.py Normal file
View File

@@ -0,0 +1,159 @@
import gzip
import hashlib
import json
import time
from datetime import datetime, timezone
import requests
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.models import CrawlError, CrawlRun, Employee, EmployeeSnapshot, ParserSource, ProfileTab
from app.parser.collector import collect_profile_links
from app.parser.profile import parse_person_profile
from app.parser.profile_url import profile_key
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; MIEMEmployeesBot/0.1.0; +https://miem.hse.ru/)"
}
def run_crawl(db: Session, settings: Settings) -> CrawlRun:
source = _ensure_source(db, settings.source_url)
run = CrawlRun(source_url=source.source_url, status="running")
db.add(run)
db.commit()
db.refresh(run)
found_keys: set[str] = set()
parsed_count = 0
try:
with requests.Session() as session:
urls = collect_profile_links(session, source.source_url, HEADERS, settings.request_timeout)
if settings.crawl_limit:
urls = urls[: settings.crawl_limit]
run.found_count = len(urls)
db.commit()
for url in urls:
key = profile_key(url)
if key:
found_keys.add(key)
try:
parsed = parse_person_profile(
session,
url,
HEADERS,
settings.request_timeout,
settings.parser_use_playwright,
)
if not parsed:
continue
_upsert_employee(db, run, parsed)
parsed_count += 1
run.parsed_count = parsed_count
db.commit()
except Exception as exc:
run.error_count += 1
db.add(
CrawlError(
crawl_run_id=run.id,
profile_url=url,
error_type=type(exc).__name__,
message=str(exc),
)
)
db.commit()
finally:
time.sleep(settings.request_delay_seconds)
run.dismissed_count = _mark_dismissed(db, found_keys)
run.status = "completed"
except Exception as exc:
run.status = "failed"
run.message = str(exc)
finally:
run.finished_at = datetime.now(timezone.utc)
db.commit()
db.refresh(run)
return run
def _ensure_source(db: Session, source_url: str) -> ParserSource:
source = db.scalar(select(ParserSource).where(ParserSource.source_url == source_url))
if source:
return source
source = ParserSource(source_url=source_url, enabled=True)
db.add(source)
db.commit()
db.refresh(source)
return source
def _upsert_employee(db: Session, run: CrawlRun, parsed: dict) -> Employee:
html = parsed.pop("_html", None)
checksum = _checksum(parsed)
key = f"{parsed.get('profile_type')}:{parsed.get('profile_id')}"
employee = db.scalar(select(Employee).where(Employee.profile_key == key))
now = datetime.now(timezone.utc)
if not employee:
employee = Employee(
profile_key=key,
profile_type=parsed.get("profile_type"),
profile_id=parsed.get("profile_id"),
canonical_url=parsed["source_url"],
first_seen_at=now,
)
db.add(employee)
employee.full_name = parsed.get("full_name")
employee.status = "active"
employee.last_seen_at = now
employee.dismissed_at = None
employee.parser_version = parsed.get("parser_version")
employee.current_data = parsed
employee.current_checksum = checksum
db.flush()
db.query(ProfileTab).filter(ProfileTab.employee_id == employee.id).delete()
for tab in parsed.get("tabs") or []:
db.add(
ProfileTab(
employee_id=employee.id,
title=tab.get("title") or "",
href=tab.get("href") or "",
data_index=tab.get("data_index"),
)
)
db.add(
EmployeeSnapshot(
employee_id=employee.id,
crawl_run_id=run.id,
parsed_data=parsed,
html_snapshot=gzip.compress(html.encode("utf-8")) if html else None,
checksum=checksum,
parser_version=parsed.get("parser_version"),
)
)
return employee
def _mark_dismissed(db: Session, found_keys: set[str]) -> int:
dismissed = 0
active = db.scalars(select(Employee).where(Employee.status == "active")).all()
now = datetime.now(timezone.utc)
for employee in active:
if employee.profile_key in found_keys:
continue
employee.status = "dismissed"
employee.dismissed_at = now
dismissed += 1
db.commit()
return dismissed
def _checksum(data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()

153
app/static/admin.css Normal file
View File

@@ -0,0 +1,153 @@
.admin {
margin: 0;
min-height: 100vh;
color: #1f2937;
background: #f6f7f9;
font-family: Arial, sans-serif;
}
.admin__header {
display: flex;
align-items: center;
justify-content: space-between;
gap: 24px;
padding: 18px 32px;
background: #ffffff;
border-bottom: 1px solid #d9dee7;
}
.admin__brand {
margin: 0;
font-size: 20px;
}
.admin__nav {
display: flex;
align-items: center;
gap: 14px;
}
.admin__link {
color: #0f766e;
text-decoration: none;
font-weight: 700;
}
.admin__main {
width: min(1180px, calc(100% - 32px));
margin: 28px auto;
}
.admin__footer {
padding: 20px 32px;
color: #6b7280;
border-top: 1px solid #d9dee7;
background: #ffffff;
}
.admin__grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 16px;
}
.metric {
padding: 18px;
background: #ffffff;
border: 1px solid #d9dee7;
border-radius: 8px;
}
.metric__label {
color: #6b7280;
font-size: 13px;
}
.metric__value {
margin-top: 8px;
font-size: 28px;
font-weight: 700;
}
.panel {
margin-top: 22px;
padding: 20px;
background: #ffffff;
border: 1px solid #d9dee7;
border-radius: 8px;
}
.panel__title {
margin: 0 0 16px;
font-size: 18px;
}
.table {
width: 100%;
border-collapse: collapse;
}
.table__cell,
.table__head {
padding: 10px 8px;
border-bottom: 1px solid #e5e7eb;
text-align: left;
vertical-align: top;
}
.badge {
display: inline-block;
padding: 3px 8px;
border-radius: 999px;
background: #e0f2fe;
color: #075985;
font-size: 12px;
}
.badge--dismissed {
background: #fee2e2;
color: #991b1b;
}
.form {
display: grid;
gap: 12px;
max-width: 380px;
}
.form__label {
display: grid;
gap: 6px;
font-weight: 700;
}
.form__input,
.form__select {
padding: 10px 12px;
border: 1px solid #cbd5e1;
border-radius: 6px;
}
.button {
padding: 10px 14px;
border: 0;
border-radius: 6px;
color: #ffffff;
background: #0f766e;
font-weight: 700;
cursor: pointer;
}
.button--ghost {
color: #0f766e;
background: transparent;
}
.code {
overflow-x: auto;
padding: 14px;
background: #111827;
color: #f9fafb;
border-radius: 8px;
white-space: pre-wrap;
}

28
app/templates/base.html Normal file
View File

@@ -0,0 +1,28 @@
<!doctype html>
<html lang="ru">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{% block title %}MIEM Employees{% endblock %}</title>
<link rel="stylesheet" href="/static/admin.css">
</head>
<body class="admin">
<header class="admin__header">
<h1 class="admin__brand">MIEM Employees</h1>
<nav class="admin__nav">
<a class="admin__link" href="/admin">Dashboard</a>
<a class="admin__link" href="/admin/employees">Employees</a>
<a class="admin__link" href="/admin/runs">Runs</a>
<form method="post" action="/admin/logout">
<button class="button button--ghost" type="submit">Logout</button>
</form>
</nav>
</header>
<main class="admin__main">
{% block content %}{% endblock %}
</main>
<footer class="admin__footer">
Backend {{ backend_version }} · Frontend {{ frontend_version }}
</footer>
</body>
</html>

View File

@@ -0,0 +1,21 @@
{% extends "base.html" %}
{% block title %}Dashboard · MIEM Employees{% endblock %}
{% block content %}
<section class="admin__grid">
<div class="metric"><div class="metric__label">Active</div><div class="metric__value">{{ counts.active }}</div></div>
<div class="metric"><div class="metric__label">Dismissed</div><div class="metric__value">{{ counts.dismissed }}</div></div>
<div class="metric"><div class="metric__label">Runs</div><div class="metric__value">{{ counts.runs }}</div></div>
<div class="metric"><div class="metric__label">Errors</div><div class="metric__value">{{ counts.errors }}</div></div>
</section>
<section class="panel">
<h2 class="panel__title">Latest runs</h2>
<table class="table">
<thead><tr><th class="table__head">ID</th><th class="table__head">Status</th><th class="table__head">Parsed</th><th class="table__head">Errors</th><th class="table__head">Started</th></tr></thead>
<tbody>
{% for run in runs %}
<tr><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.started_at }}</td></tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

View File

@@ -0,0 +1,28 @@
{% extends "base.html" %}
{% block title %}{{ employee.full_name }} · MIEM Employees{% endblock %}
{% block content %}
<section class="panel">
<h2 class="panel__title">{{ employee.full_name or employee.profile_key }}</h2>
<p><span class="badge {% if employee.status == "dismissed" %}badge--dismissed{% endif %}">{{ employee.status }}</span></p>
<p><a class="admin__link" href="{{ employee.canonical_url }}">{{ employee.canonical_url }}</a></p>
<h3>Tabs</h3>
<ul>
{% for tab in employee.tabs %}
<li><a class="admin__link" href="{{ tab.href }}">{{ tab.title }}</a></li>
{% endfor %}
</ul>
<h3>Current data</h3>
<pre class="code">{{ employee.current_data | tojson(indent=2) }}</pre>
</section>
<section class="panel">
<h2 class="panel__title">Snapshots</h2>
<table class="table">
<thead><tr><th class="table__head">Captured</th><th class="table__head">Checksum</th><th class="table__head">Parser</th></tr></thead>
<tbody>
{% for snapshot in snapshots %}
<tr><td class="table__cell">{{ snapshot.captured_at }}</td><td class="table__cell">{{ snapshot.checksum }}</td><td class="table__cell">{{ snapshot.parser_version }}</td></tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

View File

@@ -0,0 +1,29 @@
{% extends "base.html" %}
{% block title %}Employees · MIEM Employees{% endblock %}
{% block content %}
<section class="panel">
<h2 class="panel__title">Employees</h2>
<form class="form" method="get" action="/admin/employees">
<input class="form__input" name="q" value="{{ q }}" placeholder="Name or URL">
<select class="form__select" name="status">
<option value="" {% if not status %}selected{% endif %}>All</option>
<option value="active" {% if status == "active" %}selected{% endif %}>Active</option>
<option value="dismissed" {% if status == "dismissed" %}selected{% endif %}>Dismissed</option>
</select>
<button class="button" type="submit">Search</button>
</form>
<table class="table">
<thead><tr><th class="table__head">Name</th><th class="table__head">Status</th><th class="table__head">Last seen</th><th class="table__head">Profile</th></tr></thead>
<tbody>
{% for employee in employees %}
<tr>
<td class="table__cell"><a class="admin__link" href="/admin/employees/{{ employee.id }}">{{ employee.full_name or employee.profile_key }}</a></td>
<td class="table__cell"><span class="badge {% if employee.status == "dismissed" %}badge--dismissed{% endif %}">{{ employee.status }}</span></td>
<td class="table__cell">{{ employee.last_seen_at }}</td>
<td class="table__cell"><a class="admin__link" href="{{ employee.canonical_url }}">{{ employee.canonical_url }}</a></td>
</tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

25
app/templates/login.html Normal file
View File

@@ -0,0 +1,25 @@
<!doctype html>
<html lang="ru">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Login · MIEM Employees</title>
<link rel="stylesheet" href="/static/admin.css">
</head>
<body class="admin">
<main class="admin__main">
<section class="panel">
<h1 class="panel__title">Admin login</h1>
{% if error %}<p>{{ error }}</p>{% endif %}
<form class="form" method="post" action="/admin/login">
<label class="form__label">Login <input class="form__input" name="username" autocomplete="username"></label>
<label class="form__label">Password <input class="form__input" name="password" type="password" autocomplete="current-password"></label>
<button class="button" type="submit">Sign in</button>
</form>
</section>
</main>
<footer class="admin__footer">
Backend {{ backend_version }} · Frontend {{ frontend_version }}
</footer>
</body>
</html>

27
app/templates/runs.html Normal file
View File

@@ -0,0 +1,27 @@
{% extends "base.html" %}
{% block title %}Runs · MIEM Employees{% endblock %}
{% block content %}
<section class="panel">
<h2 class="panel__title">Crawl runs</h2>
<form method="post" action="/admin/runs"><button class="button" type="submit">Start crawl</button></form>
<table class="table">
<thead><tr><th class="table__head">ID</th><th class="table__head">Status</th><th class="table__head">Found</th><th class="table__head">Parsed</th><th class="table__head">Errors</th><th class="table__head">Dismissed</th></tr></thead>
<tbody>
{% for run in runs %}
<tr><td class="table__cell">{{ run.id }}</td><td class="table__cell">{{ run.status }}</td><td class="table__cell">{{ run.found_count }}</td><td class="table__cell">{{ run.parsed_count }}</td><td class="table__cell">{{ run.error_count }}</td><td class="table__cell">{{ run.dismissed_count }}</td></tr>
{% endfor %}
</tbody>
</table>
</section>
<section class="panel">
<h2 class="panel__title">Recent errors</h2>
<table class="table">
<thead><tr><th class="table__head">Run</th><th class="table__head">Profile</th><th class="table__head">Error</th></tr></thead>
<tbody>
{% for error in errors %}
<tr><td class="table__cell">{{ error.crawl_run_id }}</td><td class="table__cell">{{ error.profile_url }}</td><td class="table__cell">{{ error.error_type }}: {{ error.message }}</td></tr>
{% endfor %}
</tbody>
</table>
</section>
{% endblock %}

3
app/version.py Normal file
View File

@@ -0,0 +1,3 @@
APP_VERSION = "0.1.0"
FRONTEND_VERSION = "0.1.0"
BACKEND_VERSION = "0.1.0"

45
app/worker.py Normal file
View File

@@ -0,0 +1,45 @@
import logging
import signal
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.config import get_settings
from app.db import SessionLocal, init_db
from app.services.crawler import run_crawl
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
def crawl_once() -> None:
settings = get_settings()
with SessionLocal() as db:
run = run_crawl(db, settings)
logger.info("crawl finished: id=%s status=%s parsed=%s errors=%s", run.id, run.status, run.parsed_count, run.error_count)
def main() -> None:
init_db()
settings = get_settings()
scheduler = BackgroundScheduler(timezone="Europe/Moscow")
scheduler.add_job(crawl_once, CronTrigger.from_crontab(settings.crawl_cron), id="weekly_miem_crawl", replace_existing=True)
scheduler.start()
logger.info("worker started with cron=%s", settings.crawl_cron)
stop = False
def _stop(*_: object) -> None:
nonlocal stop
stop = True
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
while not stop:
time.sleep(1)
scheduler.shutdown()
if __name__ == "__main__":
main()

74
migrations/001_init.sql Normal file
View File

@@ -0,0 +1,74 @@
CREATE TABLE IF NOT EXISTS parser_sources (
id SERIAL PRIMARY KEY,
source_url TEXT NOT NULL UNIQUE,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS crawl_runs (
id SERIAL PRIMARY KEY,
source_url TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'running',
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ,
found_count INTEGER NOT NULL DEFAULT 0,
parsed_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
dismissed_count INTEGER NOT NULL DEFAULT 0,
message TEXT
);
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
profile_key VARCHAR(255) NOT NULL UNIQUE,
profile_type VARCHAR(50),
profile_id VARCHAR(255),
canonical_url TEXT NOT NULL,
full_name TEXT,
status VARCHAR(32) NOT NULL DEFAULT 'active',
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
dismissed_at TIMESTAMPTZ,
parser_version VARCHAR(32),
current_data JSONB,
current_checksum VARCHAR(64),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_employees_full_name ON employees (full_name);
CREATE INDEX IF NOT EXISTS ix_employees_status ON employees (status);
CREATE TABLE IF NOT EXISTS employee_snapshots (
id SERIAL PRIMARY KEY,
employee_id INTEGER NOT NULL REFERENCES employees(id),
crawl_run_id INTEGER REFERENCES crawl_runs(id),
parsed_data JSONB NOT NULL,
html_snapshot BYTEA,
checksum VARCHAR(64) NOT NULL,
parser_version VARCHAR(32),
captured_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_employee_snapshots_employee_id ON employee_snapshots (employee_id);
CREATE TABLE IF NOT EXISTS crawl_errors (
id SERIAL PRIMARY KEY,
crawl_run_id INTEGER NOT NULL REFERENCES crawl_runs(id),
profile_url TEXT,
error_type VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_crawl_errors_run_id ON crawl_errors (crawl_run_id);
CREATE TABLE IF NOT EXISTS profile_tabs (
id SERIAL PRIMARY KEY,
employee_id INTEGER NOT NULL REFERENCES employees(id),
title TEXT NOT NULL,
href TEXT NOT NULL,
data_index VARCHAR(64)
);
CREATE INDEX IF NOT EXISTS ix_profile_tabs_employee_id ON profile_tabs (employee_id);

23
tests/conftest.py Normal file
View File

@@ -0,0 +1,23 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.db import Base
@pytest.fixture()
def db_session():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
Base.metadata.drop_all(engine)

107
tests/test_api_mcp.py Normal file
View File

@@ -0,0 +1,107 @@
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.config import Settings, get_settings
from app.db import Base, get_db
from app.main import app
from app.models import Employee
def test_health_returns_versions():
client = TestClient(app)
response = client.get("/api/health")
assert response.status_code == 200
assert response.json()["backend_version"] == "0.1.0"
def test_mcp_requires_token_and_lists_tools():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def override_db():
session = Session()
try:
yield session
finally:
session.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(mcp_token="secret", session_secret="session-secret")
client = TestClient(app)
unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
authorized = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert unauthorized.status_code == 401
assert authorized.status_code == 200
assert authorized.json()["result"]["tools"][0]["name"] == "search_employees"
app.dependency_overrides.clear()
def test_mcp_search_employees_returns_matching_employee():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
session.add(
Employee(
profile_key="staff:avsergeev",
profile_type="staff",
profile_id="avsergeev",
canonical_url="https://www.hse.ru/staff/avsergeev",
full_name="Сергеев Алексей Викторович",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
current_data={"sections": []},
)
)
session.commit()
session.close()
def override_db():
db = Session()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(mcp_token="secret", session_secret="session-secret")
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "search_employees", "arguments": {"query": "Сергеев"}},
},
)
assert response.status_code == 200
assert "Сергеев Алексей Викторович" in response.json()["result"]["content"][0]["text"]
app.dependency_overrides.clear()

34
tests/test_crawler.py Normal file
View File

@@ -0,0 +1,34 @@
from datetime import datetime, timezone
from app.models import Employee
from app.services.crawler import _mark_dismissed
def test_mark_dismissed_only_marks_missing_active_employees(db_session):
db_session.add(
Employee(
profile_key="staff:kept",
canonical_url="https://www.hse.ru/staff/kept",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
)
)
db_session.add(
Employee(
profile_key="staff:gone",
canonical_url="https://www.hse.ru/staff/gone",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
)
)
db_session.commit()
dismissed = _mark_dismissed(db_session, {"staff:kept"})
assert dismissed == 1
assert db_session.query(Employee).filter_by(profile_key="staff:kept").one().status == "active"
gone = db_session.query(Employee).filter_by(profile_key="staff:gone").one()
assert gone.status == "dismissed"
assert gone.dismissed_at is not None

28
tests/test_parser.py Normal file
View File

@@ -0,0 +1,28 @@
from bs4 import BeautifulSoup
from app.parser.profile import extract_person_tabs
from app.parser.profile_url import normalize_profile_url, parse_profile_identity
def test_normalize_profile_url_supports_staff_and_org_persons():
assert normalize_profile_url("/staff/avsergeev#sci") == "https://www.hse.ru/staff/avsergeev"
assert normalize_profile_url("https://www.hse.ru/org/persons/123/") == "https://www.hse.ru/org/persons/123"
assert parse_profile_identity("https://www.hse.ru/staff/avsergeev") == ("staff", "avsergeev")
def test_extract_person_tabs_prefers_person_menu_addition():
soup = BeautifulSoup(
"""
<div class="person-menu is-desktop small person-menu-addition">
<a href="#main">Домашняя страница</a>
<a href="#sci" data-index="1">Публикации</a>
</div>
<a href="/org/persons/999">Other person</a>
""",
"html.parser",
)
tabs = extract_person_tabs(soup, "https://www.hse.ru/staff/avsergeev")
assert [tab["title"] for tab in tabs] == ["Домашняя страница", "Публикации"]
assert tabs[1]["href"] == "https://www.hse.ru/staff/avsergeev#sci"