import time from datetime import datetime, timezone from types import SimpleNamespace import jwt from fastapi.testclient import TestClient from cryptography.hazmat.primitives.asymmetric import rsa from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool import app.security as security from app.config import Settings, get_settings from app.db import Base, get_db from app.main import app from app.models import CrawlRun, CrawlRunEmployeeChange, Employee from app.security import SESSION_COOKIE, sign_session def test_health_returns_versions(): client = TestClient(app) response = client.get("/api/health") assert response.status_code == 200 assert response.json()["backend_version"] == "0.4.4" def test_mcp_requires_token_and_lists_tools(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def override_db(): session = Session() try: yield session finally: session.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: Settings( mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret" ) client = TestClient(app) unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}) authorized = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert unauthorized.status_code == 401 assert authorized.status_code == 200 assert authorized.json()["result"]["tools"][0]["name"] == "search_employees" assert any(tool["name"] == "get_crawl_run_details" for tool in authorized.json()["result"]["tools"]) app.dependency_overrides.clear() def test_mcp_search_employees_returns_matching_employee(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() session.add( Employee( profile_key="staff:avsergeev", profile_type="staff", profile_id="avsergeev", canonical_url="https://www.hse.ru/staff/avsergeev", full_name="Сергеев Алексей Викторович", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), current_data={"sections": []}, ) ) session.commit() session.close() def override_db(): db = Session() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: Settings( mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret" ) client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "search_employees", "arguments": {"query": "Сергеев"}}, }, ) assert response.status_code == 200 assert "Сергеев Алексей Викторович" in response.json()["result"]["content"][0]["text"] app.dependency_overrides.clear() def test_mcp_get_crawl_run_details_returns_changes(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() run = CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1) employee = Employee( profile_key="staff:new", profile_type="staff", profile_id="new", canonical_url="https://www.hse.ru/staff/new", full_name="New Person", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), ) session.add_all([run, employee]) session.commit() session.add( CrawlRunEmployeeChange( crawl_run_id=run.id, employee_id=employee.id, profile_key=employee.profile_key, profile_url=employee.canonical_url, full_name=employee.full_name, change_type="new", profile_available=True, message="added", ) ) session.commit() run_id = run.id session.close() def override_db(): db = Session() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: Settings( mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret" ) client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "get_crawl_run_details", "arguments": {"run_id": run_id}}, }, ) assert response.status_code == 200 text = response.json()["result"]["content"][0]["text"] assert "New Person" in text assert "changes_detail_available" in text app.dependency_overrides.clear() def test_mcp_oauth_rejects_static_token(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def override_db(): session = Session() try: yield session finally: session.close() settings = Settings( mcp_auth_mode="oauth", mcp_token="secret", session_secret="session-secret", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", ) app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 401 assert response.headers["www-authenticate"] == ( 'Bearer resource_metadata="http://localhost:8001/.well-known/oauth-protected-resource"' ) app.dependency_overrides.clear() def test_mcp_oauth_missing_auth_returns_metadata_challenge(): settings = Settings( mcp_auth_mode="oauth", mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", ) app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}) assert response.status_code == 401 assert response.headers["www-authenticate"] == ( 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ) app.dependency_overrides.clear() def test_mcp_accepts_valid_oauth_jwt(monkeypatch): public_key, token = _oauth_key_and_token() monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 200 assert response.json()["result"]["tools"][0]["name"] == "search_employees" app.dependency_overrides.clear() def test_mcp_rejects_invalid_oauth_jwts(monkeypatch): public_key, expired_token = _oauth_key_and_token(exp=int(time.time()) - 60) _, wrong_issuer_token = _oauth_key_and_token(issuer="https://other.example.com") _, wrong_audience_token = _oauth_key_and_token(audience="other-audience") _, bad_signature_token = _oauth_key_and_token(public_key=public_key) monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) for token in [expired_token, wrong_issuer_token, wrong_audience_token, bad_signature_token]: response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 401 app.dependency_overrides.clear() def test_mcp_rejects_oauth_jwt_without_required_scope(monkeypatch): public_key, token = _oauth_key_and_token(scope="profile") monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 403 app.dependency_overrides.clear() def test_mcp_protected_resource_metadata_uses_settings(): settings = Settings( mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com/", mcp_oauth_required_scope="mcp:tools", ) app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.get("/.well-known/oauth-protected-resource") assert response.status_code == 200 assert response.json() == { "resource": "https://api.example.com/mcp", "authorization_servers": ["https://auth.example.com"], "bearer_methods_supported": ["header"], "scopes_supported": ["mcp:tools"], "resource_documentation": "https://api.example.com/mcp", } app.dependency_overrides.clear() def test_api_employees_and_stats_require_admin_session(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) db = Session() db.add( Employee( profile_key="staff:alpha", profile_type="staff", profile_id="alpha", canonical_url="https://www.hse.ru/staff/alpha", full_name="Alpha Person", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), current_data={"contacts": {"emails": ["alpha@hse.ru"]}, "sections": []}, ) ) run = CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1) db.add(run) db.commit() db.add( CrawlRunEmployeeChange( crawl_run_id=run.id, employee_id=1, profile_key="staff:alpha", profile_url="https://www.hse.ru/staff/alpha", full_name="Alpha Person", change_type="new", profile_available=True, message="added", ) ) db.commit() run_id = run.id db.close() settings = Settings(admin_username="admin", admin_password="password", session_secret="session-secret") def override_db(): session = Session() try: yield session finally: session.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) client.cookies.set(SESSION_COOKIE, sign_session("admin", settings)) employees = client.get("/api/employees", params={"q": "Alpha", "has_email": True}) stats = client.get("/api/stats") run_details = client.get(f"/api/crawl-runs/{run_id}") assert employees.status_code == 200 assert employees.json()["total"] == 1 assert stats.status_code == 200 assert stats.json()["new_in_last_run"] == 1 assert run_details.status_code == 200 assert run_details.json()["changes"]["new"][0]["full_name"] == "Alpha Person" app.dependency_overrides.clear() def _oauth_settings() -> Settings: return Settings( mcp_auth_mode="oauth", mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", session_secret="session-secret", ) def _oauth_key_and_token( *, issuer: str = "https://auth.example.com", audience: str = "miem-mcp", scope: str = "mcp:tools", exp: int | None = None, public_key=None, ): private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) claims = { "iss": issuer, "aud": audience, "scope": scope, "sub": "mcp-client", "iat": int(time.time()), "exp": exp or int(time.time()) + 300, } token = jwt.encode(claims, private_key, algorithm="RS256", headers={"kid": "test-key"}) return public_key or private_key.public_key(), token