432 lines
14 KiB
Python
432 lines
14 KiB
Python
import time
|
|
from datetime import datetime, timezone
|
|
from types import SimpleNamespace
|
|
|
|
import jwt
|
|
from fastapi.testclient import TestClient
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
import app.security as security
|
|
from app.config import Settings, get_settings
|
|
from app.db import Base, get_db
|
|
from app.main import app
|
|
from app.models import CrawlRun, CrawlRunEmployeeChange, Employee
|
|
from app.security import SESSION_COOKIE, sign_session
|
|
|
|
|
|
def test_health_returns_versions():
|
|
client = TestClient(app)
|
|
|
|
response = client.get("/api/health")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["backend_version"] == "0.4.0"
|
|
|
|
|
|
def test_mcp_requires_token_and_lists_tools():
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
def override_db():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.close()
|
|
|
|
app.dependency_overrides[get_db] = override_db
|
|
app.dependency_overrides[get_settings] = lambda: Settings(
|
|
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
|
|
)
|
|
client = TestClient(app)
|
|
|
|
unauthorized = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
|
|
authorized = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": "Bearer secret"},
|
|
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
|
)
|
|
|
|
assert unauthorized.status_code == 401
|
|
assert authorized.status_code == 200
|
|
assert authorized.json()["result"]["tools"][0]["name"] == "search_employees"
|
|
assert any(tool["name"] == "get_crawl_run_details" for tool in authorized.json()["result"]["tools"])
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_search_employees_returns_matching_employee():
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
session.add(
|
|
Employee(
|
|
profile_key="staff:avsergeev",
|
|
profile_type="staff",
|
|
profile_id="avsergeev",
|
|
canonical_url="https://www.hse.ru/staff/avsergeev",
|
|
full_name="Сергеев Алексей Викторович",
|
|
status="active",
|
|
first_seen_at=datetime.now(timezone.utc),
|
|
last_seen_at=datetime.now(timezone.utc),
|
|
current_data={"sections": []},
|
|
)
|
|
)
|
|
session.commit()
|
|
session.close()
|
|
|
|
def override_db():
|
|
db = Session()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
app.dependency_overrides[get_db] = override_db
|
|
app.dependency_overrides[get_settings] = lambda: Settings(
|
|
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
|
|
)
|
|
client = TestClient(app)
|
|
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": "Bearer secret"},
|
|
json={
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "tools/call",
|
|
"params": {"name": "search_employees", "arguments": {"query": "Сергеев"}},
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert "Сергеев Алексей Викторович" in response.json()["result"]["content"][0]["text"]
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_get_crawl_run_details_returns_changes():
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1)
|
|
employee = Employee(
|
|
profile_key="staff:new",
|
|
profile_type="staff",
|
|
profile_id="new",
|
|
canonical_url="https://www.hse.ru/staff/new",
|
|
full_name="New Person",
|
|
status="active",
|
|
first_seen_at=datetime.now(timezone.utc),
|
|
last_seen_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add_all([run, employee])
|
|
session.commit()
|
|
session.add(
|
|
CrawlRunEmployeeChange(
|
|
crawl_run_id=run.id,
|
|
employee_id=employee.id,
|
|
profile_key=employee.profile_key,
|
|
profile_url=employee.canonical_url,
|
|
full_name=employee.full_name,
|
|
change_type="new",
|
|
profile_available=True,
|
|
message="added",
|
|
)
|
|
)
|
|
session.commit()
|
|
run_id = run.id
|
|
session.close()
|
|
|
|
def override_db():
|
|
db = Session()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
app.dependency_overrides[get_db] = override_db
|
|
app.dependency_overrides[get_settings] = lambda: Settings(
|
|
mcp_auth_mode="token", mcp_token="secret", session_secret="session-secret"
|
|
)
|
|
client = TestClient(app)
|
|
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": "Bearer secret"},
|
|
json={
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "tools/call",
|
|
"params": {"name": "get_crawl_run_details", "arguments": {"run_id": run_id}},
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
text = response.json()["result"]["content"][0]["text"]
|
|
assert "New Person" in text
|
|
assert "changes_detail_available" in text
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_oauth_rejects_static_token():
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
def override_db():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.close()
|
|
|
|
settings = Settings(
|
|
mcp_auth_mode="oauth",
|
|
mcp_token="secret",
|
|
session_secret="session-secret",
|
|
mcp_oauth_issuer="https://auth.example.com",
|
|
mcp_oauth_audience="miem-mcp",
|
|
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
|
|
)
|
|
app.dependency_overrides[get_db] = override_db
|
|
app.dependency_overrides[get_settings] = lambda: settings
|
|
client = TestClient(app)
|
|
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": "Bearer secret"},
|
|
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.headers["www-authenticate"] == (
|
|
'Bearer resource_metadata="http://localhost:8001/.well-known/oauth-protected-resource"'
|
|
)
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_oauth_missing_auth_returns_metadata_challenge():
|
|
settings = Settings(
|
|
mcp_auth_mode="oauth",
|
|
mcp_resource_url="https://api.example.com/mcp",
|
|
mcp_oauth_issuer="https://auth.example.com",
|
|
mcp_oauth_audience="miem-mcp",
|
|
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
|
|
)
|
|
app.dependency_overrides[get_settings] = lambda: settings
|
|
client = TestClient(app)
|
|
|
|
response = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}})
|
|
|
|
assert response.status_code == 401
|
|
assert response.headers["www-authenticate"] == (
|
|
'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"'
|
|
)
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_accepts_valid_oauth_jwt(monkeypatch):
|
|
public_key, token = _oauth_key_and_token()
|
|
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
|
|
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
|
|
client = TestClient(app)
|
|
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["result"]["tools"][0]["name"] == "search_employees"
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_rejects_invalid_oauth_jwts(monkeypatch):
|
|
public_key, expired_token = _oauth_key_and_token(exp=int(time.time()) - 60)
|
|
_, wrong_issuer_token = _oauth_key_and_token(issuer="https://other.example.com")
|
|
_, wrong_audience_token = _oauth_key_and_token(audience="other-audience")
|
|
_, bad_signature_token = _oauth_key_and_token(public_key=public_key)
|
|
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
|
|
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
|
|
client = TestClient(app)
|
|
|
|
for token in [expired_token, wrong_issuer_token, wrong_audience_token, bad_signature_token]:
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_rejects_oauth_jwt_without_required_scope(monkeypatch):
|
|
public_key, token = _oauth_key_and_token(scope="profile")
|
|
monkeypatch.setattr(security, "_get_mcp_oauth_signing_key", lambda _token, _settings: SimpleNamespace(key=public_key))
|
|
app.dependency_overrides[get_settings] = lambda: _oauth_settings()
|
|
client = TestClient(app)
|
|
|
|
response = client.post(
|
|
"/mcp",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_mcp_protected_resource_metadata_uses_settings():
|
|
settings = Settings(
|
|
mcp_resource_url="https://api.example.com/mcp",
|
|
mcp_oauth_issuer="https://auth.example.com/",
|
|
mcp_oauth_required_scope="mcp:tools",
|
|
)
|
|
app.dependency_overrides[get_settings] = lambda: settings
|
|
client = TestClient(app)
|
|
|
|
response = client.get("/.well-known/oauth-protected-resource")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {
|
|
"resource": "https://api.example.com/mcp",
|
|
"authorization_servers": ["https://auth.example.com"],
|
|
"bearer_methods_supported": ["header"],
|
|
"scopes_supported": ["mcp:tools"],
|
|
"resource_documentation": "https://api.example.com/mcp",
|
|
}
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def test_api_employees_and_stats_require_admin_session():
|
|
engine = create_engine(
|
|
"sqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
Session = sessionmaker(bind=engine)
|
|
db = Session()
|
|
db.add(
|
|
Employee(
|
|
profile_key="staff:alpha",
|
|
profile_type="staff",
|
|
profile_id="alpha",
|
|
canonical_url="https://www.hse.ru/staff/alpha",
|
|
full_name="Alpha Person",
|
|
status="active",
|
|
first_seen_at=datetime.now(timezone.utc),
|
|
last_seen_at=datetime.now(timezone.utc),
|
|
current_data={"contacts": {"emails": ["alpha@hse.ru"]}, "sections": []},
|
|
)
|
|
)
|
|
run = CrawlRun(source_url="https://miem.hse.ru/persons", status="completed", new_count=1)
|
|
db.add(run)
|
|
db.commit()
|
|
db.add(
|
|
CrawlRunEmployeeChange(
|
|
crawl_run_id=run.id,
|
|
employee_id=1,
|
|
profile_key="staff:alpha",
|
|
profile_url="https://www.hse.ru/staff/alpha",
|
|
full_name="Alpha Person",
|
|
change_type="new",
|
|
profile_available=True,
|
|
message="added",
|
|
)
|
|
)
|
|
db.commit()
|
|
run_id = run.id
|
|
db.close()
|
|
|
|
settings = Settings(admin_username="admin", admin_password="password", session_secret="session-secret")
|
|
|
|
def override_db():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
finally:
|
|
session.close()
|
|
|
|
app.dependency_overrides[get_db] = override_db
|
|
app.dependency_overrides[get_settings] = lambda: settings
|
|
client = TestClient(app)
|
|
client.cookies.set(SESSION_COOKIE, sign_session("admin", settings))
|
|
|
|
employees = client.get("/api/employees", params={"q": "Alpha", "has_email": True})
|
|
stats = client.get("/api/stats")
|
|
run_details = client.get(f"/api/crawl-runs/{run_id}")
|
|
|
|
assert employees.status_code == 200
|
|
assert employees.json()["total"] == 1
|
|
assert stats.status_code == 200
|
|
assert stats.json()["new_in_last_run"] == 1
|
|
assert run_details.status_code == 200
|
|
assert run_details.json()["changes"]["new"][0]["full_name"] == "Alpha Person"
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
def _oauth_settings() -> Settings:
|
|
return Settings(
|
|
mcp_auth_mode="oauth",
|
|
mcp_resource_url="https://api.example.com/mcp",
|
|
mcp_oauth_issuer="https://auth.example.com",
|
|
mcp_oauth_audience="miem-mcp",
|
|
mcp_oauth_jwks_url="https://auth.example.com/.well-known/jwks.json",
|
|
session_secret="session-secret",
|
|
)
|
|
|
|
|
|
def _oauth_key_and_token(
|
|
*,
|
|
issuer: str = "https://auth.example.com",
|
|
audience: str = "miem-mcp",
|
|
scope: str = "mcp:tools",
|
|
exp: int | None = None,
|
|
public_key=None,
|
|
):
|
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
claims = {
|
|
"iss": issuer,
|
|
"aud": audience,
|
|
"scope": scope,
|
|
"sub": "mcp-client",
|
|
"iat": int(time.time()),
|
|
"exp": exp or int(time.time()) + 300,
|
|
}
|
|
token = jwt.encode(claims, private_key, algorithm="RS256", headers={"kid": "test-key"})
|
|
return public_key or private_key.public_key(), token
|