Files
miem_workers/tests/test_api_mcp.py

343 lines
11 KiB
Python

import time
from datetime import datetime, timezone
from types import SimpleNamespace
import jwt
from fastapi.testclient import TestClient
from cryptography.hazmat.primitives.asymmetric import rsa
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import app.security as security
from app.config import Settings, get_settings
from app.db import Base, get_db
from app.main import app
from app.models import CrawlRun, Employee
from app.security import SESSION_COOKIE, sign_session
def test_health_returns_versions():
client = TestClient(app)
response = client.get("/api/health")
assert response.status_code == 200
assert response.json()["backend_version"] == "0.3.0"
def test_mcp_requires_token_and_lists_tools():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def override_db():
session = Session()
try:
yield session
finally:
session.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
)
client = TestClient(app)
unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
authorized = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert unauthorized.status_code == 401
assert authorized.status_code == 200
assert authorized.json()["result"]["tools"][0]["name"] == "search_employees"
app.dependency_overrides.clear()
def test_mcp_search_employees_returns_matching_employee():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
session.add(
Employee(
profile_key="staff:avsergeev",
profile_type="staff",
profile_id="avsergeev",
canonical_url="https://www.hse.ru/staff/avsergeev",
full_name="Сергеев Алексей Викторович",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
current_data={"sections": []},
)
)
session.commit()
session.close()
def override_db():
db = Session()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: Settings(
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
)
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "search_employees", "arguments": {"query": "Сергеев"}},
},
)
assert response.status_code == 200
assert "Сергеев Алексей Викторович" in response.json()["result"]["content"][0]["text"]
app.dependency_overrides.clear()
def test_mcp_oauth_rejects_static_token():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def override_db():
session = Session()
try:
yield session
finally:
session.close()
settings = Settings(
mcp_auth_mode="oauth",
mcp_token="secret",
session_secret="session-secret",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
)
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": "Bearer secret"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 401
assert response.headers["www-authenticate"] == (
'Bearer resource_metadata="http://localhost:8001/.well-known/oauth-protected-resource"'
)
app.dependency_overrides.clear()
def test_mcp_oauth_missing_auth_returns_metadata_challenge():
settings = Settings(
mcp_auth_mode="oauth",
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
)
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
response = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
assert response.status_code == 401
assert response.headers["www-authenticate"] == (
'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"'
)
app.dependency_overrides.clear()
def test_mcp_accepts_valid_oauth_jwt(monkeypatch):
public_key, token = _oauth_key_and_token()
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 200
assert response.json()["result"]["tools"][0]["name"] == "search_employees"
app.dependency_overrides.clear()
def test_mcp_rejects_invalid_oauth_jwts(monkeypatch):
public_key, expired_token = _oauth_key_and_token(exp=int(time.time()) - 60)
_, wrong_issuer_token = _oauth_key_and_token(issuer="https://other.example.com")
_, wrong_audience_token = _oauth_key_and_token(audience="other-audience")
_, bad_signature_token = _oauth_key_and_token(public_key=public_key)
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
for token in [expired_token, wrong_issuer_token, wrong_audience_token, bad_signature_token]:
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 401
app.dependency_overrides.clear()
def test_mcp_rejects_oauth_jwt_without_required_scope(monkeypatch):
public_key, token = _oauth_key_and_token(scope="profile")
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
client = TestClient(app)
response = client.post(
"/mcp",
headers={"Authorization": f"Bearer {token}"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
)
assert response.status_code == 403
app.dependency_overrides.clear()
def test_mcp_protected_resource_metadata_uses_settings():
settings = Settings(
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com/",
mcp_oauth_required_scope="mcp:tools",
)
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
response = client.get("/.well-known/oauth-protected-resource")
assert response.status_code == 200
assert response.json() == {
"resource": "https://api.example.com/mcp",
"authorization_servers": ["https://auth.example.com"],
"bearer_methods_supported": ["header"],
"scopes_supported": ["mcp:tools"],
"resource_documentation": "https://api.example.com/mcp",
}
app.dependency_overrides.clear()
def test_api_employees_and_stats_require_admin_session():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
db = Session()
db.add(
Employee(
profile_key="staff:alpha",
profile_type="staff",
profile_id="alpha",
canonical_url="https://www.hse.ru/staff/alpha",
full_name="Alpha Person",
status="active",
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
current_data={"contacts": {"emails": ["alpha@hse.ru"]}, "sections": []},
)
)
db.add(CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1))
db.commit()
db.close()
settings = Settings(admin_username="admin", admin_password="password", session_secret="session-secret")
def override_db():
session = Session()
try:
yield session
finally:
session.close()
app.dependency_overrides[get_db] = override_db
app.dependency_overrides[get_settings] = lambda: settings
client = TestClient(app)
client.cookies.set(SESSION_COOKIE, sign_session("admin", settings))
employees = client.get("/api/employees", params={"q": "Alpha", "has_email": True})
stats = client.get("/api/stats")
assert employees.status_code == 200
assert employees.json()["total"] == 1
assert stats.status_code == 200
assert stats.json()["new_in_last_run"] == 1
app.dependency_overrides.clear()
def _oauth_settings() -> Settings:
return Settings(
mcp_auth_mode="oauth",
mcp_resource_url="https://api.example.com/mcp",
mcp_oauth_issuer="https://auth.example.com",
mcp_oauth_audience="miem-mcp",
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
session_secret="session-secret",
)
def _oauth_key_and_token(
*,
issuer: str = "https://auth.example.com",
audience: str = "miem-mcp",
scope: str = "mcp:tools",
exp: int | None = None,
public_key=None,
):
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
claims = {
"iss": issuer,
"aud": audience,
"scope": scope,
"sub": "mcp-client",
"iat": int(time.time()),
"exp": exp or int(time.time()) + 300,
}
token = jwt.encode(claims, private_key, algorithm="RS256", headers={"kid": "test-key"})
return public_key or private_key.public_key(), token