fix: remove MCP application-level authorization

This commit is contained in:
Anton
2026-05-08 12:14:19 +03:00
parent a4e7388bcf
commit 7593a460c7
11 changed files with 20 additions and 363 deletions

View File

@@ -1,6 +1,4 @@
from functools import lru_cache
from typing import Literal
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -19,13 +17,6 @@ class Settings(BaseSettings):
admin_username: str = "admin"
admin_password: str = "admin"
session_secret: str = Field(default="dev-session-secret", min_length=8)
mcp_token: str = "dev-mcp-token"
mcp_auth_mode: Literal["token", "oauth"] = "oauth"
mcp_resource_url: str = "http://localhost:8001/mcp"
mcp_oauth_issuer: str = ""
mcp_oauth_audience: str = ""
mcp_oauth_jwks_url: str = ""
mcp_oauth_required_scope: str = "mcp:tools"
@field_validator("crawl_limit", mode="before")
@classmethod
@@ -34,15 +25,6 @@ class Settings(BaseSettings):
return None
return value
def oauth_jwks_url(self) -> str:
if self.mcp_oauth_jwks_url:
return self.mcp_oauth_jwks_url
issuer = self.mcp_oauth_issuer.rstrip("/")
if not issuer:
return ""
return f"{issuer}/.well-known/jwks.json"
@lru_cache
def get_settings() -> Settings:
return Settings()

View File

@@ -4,7 +4,6 @@ from fastapi.staticfiles import StaticFiles
from app.admin import router as admin_router
from app.api import router as api_router
from app.db import init_db
from app.mcp import metadata_router as mcp_metadata_router
from app.mcp import router as mcp_router
from app.version import BACKEND_VERSION
@@ -13,7 +12,6 @@ app.mount("/static", StaticFiles(directory="app/static"), name="static")
app.include_router(api_router)
app.include_router(admin_router)
app.include_router(mcp_router)
app.include_router(mcp_metadata_router)
@app.on_event("startup")

View File

@@ -4,15 +4,12 @@ from fastapi import APIRouter, Depends, Request
from sqlalchemy import desc, or_, select
from sqlalchemy.orm import Session
from app.config import Settings, get_settings
from app.db import get_db
from app.models import CrawlRun, Employee
from app.security import mcp_protected_resource_metadata, require_mcp_auth
from app.services.admin_data import run_detail_payload
from app.version import BACKEND_VERSION
router = APIRouter(prefix="/mcp")
metadata_router = APIRouter()
TOOLS = [
@@ -65,9 +62,7 @@ TOOLS = [
async def mcp_http(
request: Request,
db: Session = Depends(get_db),
settings: Settings = Depends(get_settings),
) -> dict:
require_mcp_auth(request, settings)
payload = await request.json()
method = payload.get("method")
request_id = payload.get("id")
@@ -183,8 +178,3 @@ def _run_payload(run: CrawlRun) -> dict:
def _tool_response(data: object) -> dict:
return {"content": [{"type": "text", "text": json.dumps(data, ensure_ascii=False, default=str)}]}
@metadata_router.get("/.well-known/oauth-protected-resource")
def oauth_protected_resource(settings: Settings = Depends(get_settings)) -> dict:
return mcp_protected_resource_metadata(settings)

View File

@@ -3,10 +3,7 @@ import hashlib
import hmac
import json
import time
from functools import lru_cache
import jwt
from jwt import PyJWKClient, PyJWTError
from fastapi import HTTPException, Request, status
from app.config import Settings
@@ -47,93 +44,3 @@ def require_admin(request: Request, settings: Settings) -> str:
if not username:
raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/admin/login"})
return username
def require_mcp_auth(request: Request, settings: Settings) -> None:
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer "):
raise _mcp_unauthorized(settings, "Missing bearer token")
token = auth.removeprefix("Bearer ").strip()
if _mcp_static_token_allowed(settings) and hmac.compare_digest(token, settings.mcp_token):
return
if _mcp_oauth_allowed(settings):
_validate_mcp_oauth_token(token, settings)
return
raise _mcp_unauthorized(settings, "Invalid MCP token")
def require_mcp_token(request: Request, settings: Settings) -> None:
require_mcp_auth(request, settings)
def mcp_protected_resource_metadata(settings: Settings) -> dict:
authorization_servers = [settings.mcp_oauth_issuer.rstrip("/")] if settings.mcp_oauth_issuer else []
return {
"resource": settings.mcp_resource_url,
"authorization_servers": authorization_servers,
"bearer_methods_supported": ["header"],
"scopes_supported": [settings.mcp_oauth_required_scope],
"resource_documentation": settings.mcp_resource_url,
}
def _mcp_static_token_allowed(settings: Settings) -> bool:
return settings.mcp_auth_mode == "token"
def _mcp_oauth_allowed(settings: Settings) -> bool:
return settings.mcp_auth_mode == "oauth"
def _validate_mcp_oauth_token(token: str, settings: Settings) -> None:
if not settings.mcp_oauth_issuer or not settings.mcp_oauth_audience or not settings.oauth_jwks_url():
raise _mcp_unauthorized(settings, "MCP OAuth is not configured")
try:
signing_key = _get_mcp_oauth_signing_key(token, settings).key
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256", "RS384", "RS512", "ES256", "ES384", "ES512"],
audience=settings.mcp_oauth_audience,
issuer=settings.mcp_oauth_issuer.rstrip("/"),
)
except PyJWTError as exc:
raise _mcp_unauthorized(settings, "Invalid OAuth access token") from exc
if not _claims_have_scope(claims, settings.mcp_oauth_required_scope):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing required MCP OAuth scope")
def _claims_have_scope(claims: dict, required_scope: str) -> bool:
scopes: set[str] = set()
scope = claims.get("scope")
if isinstance(scope, str):
scopes.update(scope.split())
scp = claims.get("scp")
if isinstance(scp, str):
scopes.update(scp.split())
elif isinstance(scp, list):
scopes.update(str(item) for item in scp)
return required_scope in scopes
@lru_cache(maxsize=16)
def _get_jwk_client(jwks_url: str) -> PyJWKClient:
return PyJWKClient(jwks_url)
def _get_mcp_oauth_signing_key(token: str, settings: Settings):
return _get_jwk_client(settings.oauth_jwks_url()).get_signing_key_from_jwt(token)
def _mcp_unauthorized(settings: Settings, detail: str) -> HTTPException:
headers = {}
if _mcp_oauth_allowed(settings):
headers["WWW-Authenticate"] = f'Bearer resource_metadata="{_mcp_metadata_url(settings)}"'
return HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers=headers)
def _mcp_metadata_url(settings: Settings) -> str:
resource_url = settings.mcp_resource_url.rstrip("/")
base_url = resource_url[: -len("/mcp")] if resource_url.endswith("/mcp") else resource_url
return f"{base_url}/.well-known/oauth-protected-resource"

View File

@@ -1,3 +1,3 @@
APP_VERSION = "0.4.4"
FRONTEND_VERSION = "0.4.4"
BACKEND_VERSION = "0.4.4"
APP_VERSION = "0.4.5"
FRONTEND_VERSION = "0.4.5"
BACKEND_VERSION = "0.4.5"