import time from datetime import datetime, timezone from types import SimpleNamespace import jwt from fastapi.testclient import TestClient from cryptography.hazmat.primitives.asymmetric import rsa from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool import app.security as security from app.config import Settings, get_settings from app.db import Base, get_db from app.main import app from app.models import CrawlRun, Employee from app.security import SESSION_COOKIE, sign_session def test_health_returns_versions(): client = TestClient(app) response = client.get("/api/health") assert response.status_code == 200 assert response.json()["backend_version"] == "0.3.0" def test_mcp_requires_token_and_lists_tools(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def override_db(): session = Session() try: yield session finally: session.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: Settings( mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret" ) client = TestClient(app) unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}) authorized = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert unauthorized.status_code == 401 assert authorized.status_code == 200 assert authorized.json()["result"]["tools"][0]["name"] == "search_employees" app.dependency_overrides.clear() def test_mcp_search_employees_returns_matching_employee(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() session.add( Employee( profile_key="staff:avsergeev", profile_type="staff", profile_id="avsergeev", canonical_url="https://www.hse.ru/staff/avsergeev", full_name="Сергеев Алексей Викторович", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), current_data={"sections": []}, ) ) session.commit() session.close() def override_db(): db = Session() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: Settings( mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret" ) client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "search_employees", "arguments": {"query": "Сергеев"}}, }, ) assert response.status_code == 200 assert "Сергеев Алексей Викторович" in response.json()["result"]["content"][0]["text"] app.dependency_overrides.clear() def test_mcp_oauth_rejects_static_token(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) def override_db(): session = Session() try: yield session finally: session.close() settings = Settings( mcp_auth_mode="oauth", mcp_token="secret", session_secret="session-secret", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", ) app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": "Bearer secret"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 401 assert response.headers["www-authenticate"] == ( 'Bearer resource_metadata="http://localhost:8001/.well-known/oauth-protected-resource"' ) app.dependency_overrides.clear() def test_mcp_oauth_missing_auth_returns_metadata_challenge(): settings = Settings( mcp_auth_mode="oauth", mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", ) app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}) assert response.status_code == 401 assert response.headers["www-authenticate"] == ( 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ) app.dependency_overrides.clear() def test_mcp_accepts_valid_oauth_jwt(monkeypatch): public_key, token = _oauth_key_and_token() monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 200 assert response.json()["result"]["tools"][0]["name"] == "search_employees" app.dependency_overrides.clear() def test_mcp_rejects_invalid_oauth_jwts(monkeypatch): public_key, expired_token = _oauth_key_and_token(exp=int(time.time()) - 60) _, wrong_issuer_token = _oauth_key_and_token(issuer="https://other.example.com") _, wrong_audience_token = _oauth_key_and_token(audience="other-audience") _, bad_signature_token = _oauth_key_and_token(public_key=public_key) monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) for token in [expired_token, wrong_issuer_token, wrong_audience_token, bad_signature_token]: response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 401 app.dependency_overrides.clear() def test_mcp_rejects_oauth_jwt_without_required_scope(monkeypatch): public_key, token = _oauth_key_and_token(scope="profile") monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key)) app.dependency_overrides[get_settings] = lambda: _oauth_settings() client = TestClient(app) response = client.post( "/mcp", headers={"Authorization": f"Bearer {token}"}, json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}, ) assert response.status_code == 403 app.dependency_overrides.clear() def test_mcp_protected_resource_metadata_uses_settings(): settings = Settings( mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com/", mcp_oauth_required_scope="mcp:tools", ) app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) response = client.get("/.well-known/oauth-protected-resource") assert response.status_code == 200 assert response.json() == { "resource": "https://api.example.com/mcp", "authorization_servers": ["https://auth.example.com"], "bearer_methods_supported": ["header"], "scopes_supported": ["mcp:tools"], "resource_documentation": "https://api.example.com/mcp", } app.dependency_overrides.clear() def test_api_employees_and_stats_require_admin_session(): engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) db = Session() db.add( Employee( profile_key="staff:alpha", profile_type="staff", profile_id="alpha", canonical_url="https://www.hse.ru/staff/alpha", full_name="Alpha Person", status="active", first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), current_data={"contacts": {"emails": ["alpha@hse.ru"]}, "sections": []}, ) ) db.add(CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1)) db.commit() db.close() settings = Settings(admin_username="admin", admin_password="password", session_secret="session-secret") def override_db(): session = Session() try: yield session finally: session.close() app.dependency_overrides[get_db] = override_db app.dependency_overrides[get_settings] = lambda: settings client = TestClient(app) client.cookies.set(SESSION_COOKIE, sign_session("admin", settings)) employees = client.get("/api/employees", params={"q": "Alpha", "has_email": True}) stats = client.get("/api/stats") assert employees.status_code == 200 assert employees.json()["total"] == 1 assert stats.status_code == 200 assert stats.json()["new_in_last_run"] == 1 app.dependency_overrides.clear() def _oauth_settings() -> Settings: return Settings( mcp_auth_mode="oauth", mcp_resource_url="https://api.example.com/mcp", mcp_oauth_issuer="https://auth.example.com", mcp_oauth_audience="miem-mcp", mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json", session_secret="session-secret", ) def _oauth_key_and_token( *, issuer: str = "https://auth.example.com", audience: str = "miem-mcp", scope: str = "mcp:tools", exp: int | None = None, public_key=None, ): private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) claims = { "iss": issuer, "aud": audience, "scope": scope, "sub": "mcp-client", "iat": int(time.time()), "exp": exp or int(time.time()) + 300, } token = jwt.encode(claims, private_key, algorithm="RS256", headers={"kid": "test-key"}) return public_key or private_key.public_key(), token