Skip to content

Instantly share code, notes, and snippets.

@knjname
Created February 16, 2026 13:11
Show Gist options
  • Select an option

  • Save knjname/3843efa669705f5ebb983550c35c2a64 to your computer and use it in GitHub Desktop.

Select an option

Save knjname/3843efa669705f5ebb983550c35c2a64 to your computer and use it in GitHub Desktop.
MCP server with Google OIDC authentication
# /// script
# requires-python = ">=3.10"
# dependencies = ["mcp[cli]", "httpx"]
# ///
from __future__ import annotations
import os
import secrets
import time
from urllib.parse import urlencode
import httpx
from mcp.server.auth.middleware.auth_context import get_access_token
from mcp.server.auth.provider import (
AccessToken,
AuthorizationCode,
AuthorizationParams,
OAuthAuthorizationServerProvider,
RefreshToken,
construct_redirect_uri,
)
from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions
from mcp.server.fastmcp import FastMCP
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
from pydantic import AnyUrl
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse, Response
# ---------------------------------------------------------------------------
# Server settings
# ---------------------------------------------------------------------------
SERVER_URL = "http://localhost:8000"
OIDC_CALLBACK_PATH = "/google/callback"
# ---------------------------------------------------------------------------
# Google OAuth settings (defaults)
# ---------------------------------------------------------------------------
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "")
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
# ---------------------------------------------------------------------------
# Extended AccessToken that carries user info
# ---------------------------------------------------------------------------
class UserAccessToken(AccessToken):
user_info: dict = {}
# ---------------------------------------------------------------------------
# OIDCOAuthProvider – implements OAuthAuthorizationServerProvider
# ---------------------------------------------------------------------------
class OIDCOAuthProvider:
"""MCP OAuth authorization server that delegates authentication to an external OIDC provider."""
def __init__(
self,
*,
server_url: str,
callback_path: str,
oidc_client_id: str,
oidc_client_secret: str,
oidc_auth_url: str,
oidc_token_url: str,
oidc_userinfo_url: str,
oidc_scope: str = "openid email profile",
oidc_extra_auth_params: dict[str, str] | None = None,
) -> None:
self.server_url = server_url
self.callback_path = callback_path
self.oidc_client_id = oidc_client_id
self.oidc_client_secret = oidc_client_secret
self.oidc_auth_url = oidc_auth_url
self.oidc_token_url = oidc_token_url
self.oidc_userinfo_url = oidc_userinfo_url
self.oidc_scope = oidc_scope
self.oidc_extra_auth_params = oidc_extra_auth_params or {}
# Dynamic client registration store: client_id -> OAuthClientInformationFull
self.clients: dict[str, OAuthClientInformationFull] = {}
# Authorization codes: code -> AuthorizationCode + user_info
self.auth_codes: dict[str, dict] = {}
# Access tokens: token_string -> UserAccessToken
self.access_tokens: dict[str, UserAccessToken] = {}
# Refresh tokens: token_string -> RefreshToken + user_info
self.refresh_tokens: dict[str, dict] = {}
# OIDC provider state -> MCP client params (redirect_uri, state, code_challenge, etc.)
self.oidc_states: dict[str, dict] = {}
# -- Client registration --------------------------------------------------
async def get_client(self, client_id: str) -> OAuthClientInformationFull | None:
return self.clients.get(client_id)
async def register_client(self, client_info: OAuthClientInformationFull) -> None:
self.clients[client_info.client_id] = client_info
# -- Authorization ---------------------------------------------------------
async def authorize(
self, client: OAuthClientInformationFull, params: AuthorizationParams
) -> str:
"""Redirect to the OIDC provider's authorization endpoint."""
state = secrets.token_urlsafe(32)
# Save MCP client params so we can resume the flow in the callback
self.oidc_states[state] = {
"mcp_client_id": client.client_id,
"redirect_uri": str(params.redirect_uri),
"state": params.state,
"code_challenge": params.code_challenge,
"scopes": params.scopes,
"redirect_uri_provided_explicitly": params.redirect_uri_provided_explicitly,
}
oidc_params = {
"client_id": self.oidc_client_id,
"redirect_uri": f"{self.server_url}{self.callback_path}",
"response_type": "code",
"scope": self.oidc_scope,
"state": state,
**self.oidc_extra_auth_params,
}
return f"{self.oidc_auth_url}?{urlencode(oidc_params)}"
# -- Authorization code loading / exchange ---------------------------------
async def load_authorization_code(
self, client: OAuthClientInformationFull, authorization_code: str
) -> AuthorizationCode | None:
entry = self.auth_codes.get(authorization_code)
if entry is None:
return None
return entry["code_obj"]
async def exchange_authorization_code(
self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode
) -> OAuthToken:
entry = self.auth_codes.pop(authorization_code.code, None)
if entry is None:
raise ValueError("Invalid authorization code")
user_info: dict = entry["user_info"]
# Issue access token
access_token_str = secrets.token_urlsafe(48)
self.access_tokens[access_token_str] = UserAccessToken(
token=access_token_str,
client_id=client.client_id,
scopes=authorization_code.scopes,
expires_at=int(time.time()) + 3600,
user_info=user_info,
)
# Issue refresh token
refresh_token_str = secrets.token_urlsafe(48)
self.refresh_tokens[refresh_token_str] = {
"token_obj": RefreshToken(
token=refresh_token_str,
client_id=client.client_id,
scopes=authorization_code.scopes,
expires_at=int(time.time()) + 86400,
),
"user_info": user_info,
}
return OAuthToken(
access_token=access_token_str,
token_type="Bearer",
expires_in=3600,
refresh_token=refresh_token_str,
)
# -- Access token loading --------------------------------------------------
async def load_access_token(self, token: str) -> UserAccessToken | None:
at = self.access_tokens.get(token)
if at is None:
return None
if at.expires_at and at.expires_at < time.time():
del self.access_tokens[token]
return None
return at
# -- Refresh token loading / exchange --------------------------------------
async def load_refresh_token(
self, client: OAuthClientInformationFull, refresh_token: str
) -> RefreshToken | None:
entry = self.refresh_tokens.get(refresh_token)
if entry is None:
return None
return entry["token_obj"]
async def exchange_refresh_token(
self,
client: OAuthClientInformationFull,
refresh_token: RefreshToken,
scopes: list[str],
) -> OAuthToken:
entry = self.refresh_tokens.pop(refresh_token.token, None)
if entry is None:
raise ValueError("Invalid refresh token")
user_info: dict = entry["user_info"]
# Issue new access token
access_token_str = secrets.token_urlsafe(48)
self.access_tokens[access_token_str] = UserAccessToken(
token=access_token_str,
client_id=client.client_id,
scopes=scopes or refresh_token.scopes,
expires_at=int(time.time()) + 3600,
user_info=user_info,
)
# Issue new refresh token
new_refresh_token_str = secrets.token_urlsafe(48)
self.refresh_tokens[new_refresh_token_str] = {
"token_obj": RefreshToken(
token=new_refresh_token_str,
client_id=client.client_id,
scopes=scopes or refresh_token.scopes,
expires_at=int(time.time()) + 86400,
),
"user_info": user_info,
}
return OAuthToken(
access_token=access_token_str,
token_type="Bearer",
expires_in=3600,
refresh_token=new_refresh_token_str,
)
# -- Revocation ------------------------------------------------------------
async def revoke_token(
self, token: UserAccessToken | RefreshToken
) -> None:
if isinstance(token, UserAccessToken):
self.access_tokens.pop(token.token, None)
else:
self.refresh_tokens.pop(token.token, None)
# ---------------------------------------------------------------------------
# Provider instance & FastMCP setup
# ---------------------------------------------------------------------------
provider = OIDCOAuthProvider(
server_url=SERVER_URL,
callback_path=OIDC_CALLBACK_PATH,
oidc_client_id=GOOGLE_CLIENT_ID,
oidc_client_secret=GOOGLE_CLIENT_SECRET,
oidc_auth_url=GOOGLE_AUTH_URL,
oidc_token_url=GOOGLE_TOKEN_URL,
oidc_userinfo_url=GOOGLE_USERINFO_URL,
oidc_extra_auth_params={"access_type": "offline", "prompt": "consent"},
)
mcp = FastMCP(
"add-server",
auth_server_provider=provider,
auth=AuthSettings(
issuer_url=AnyUrl(SERVER_URL),
resource_server_url=AnyUrl(SERVER_URL),
client_registration_options=ClientRegistrationOptions(enabled=True),
),
)
# ---------------------------------------------------------------------------
# OIDC callback route
# ---------------------------------------------------------------------------
@mcp.custom_route(OIDC_CALLBACK_PATH, methods=["GET"])
async def oidc_callback(request: Request) -> Response:
"""Handle the redirect from the OIDC provider after user authenticates."""
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
if error:
return JSONResponse({"error": error}, status_code=400)
if not code or not state:
return JSONResponse({"error": "Missing code or state"}, status_code=400)
saved = provider.oidc_states.pop(state, None)
if saved is None:
return JSONResponse({"error": "Invalid state"}, status_code=400)
# Exchange auth code for tokens with the OIDC provider
async with httpx.AsyncClient() as client:
token_resp = await client.post(
provider.oidc_token_url,
data={
"code": code,
"client_id": provider.oidc_client_id,
"client_secret": provider.oidc_client_secret,
"redirect_uri": f"{provider.server_url}{provider.callback_path}",
"grant_type": "authorization_code",
},
)
if token_resp.status_code != 200:
return JSONResponse(
{"error": "Failed to exchange code with OIDC provider", "detail": token_resp.text},
status_code=502,
)
oidc_tokens = token_resp.json()
# Fetch user info from the OIDC provider
userinfo_resp = await client.get(
provider.oidc_userinfo_url,
headers={"Authorization": f"Bearer {oidc_tokens['access_token']}"},
)
if userinfo_resp.status_code != 200:
return JSONResponse(
{"error": "Failed to fetch user info from OIDC provider"},
status_code=502,
)
user_info = userinfo_resp.json()
# Generate our own authorization code for the MCP client
our_code = secrets.token_urlsafe(32)
provider.auth_codes[our_code] = {
"code_obj": AuthorizationCode(
code=our_code,
scopes=saved.get("scopes") or [],
expires_at=time.time() + 300,
client_id=saved["mcp_client_id"],
code_challenge=saved["code_challenge"],
redirect_uri=AnyUrl(saved["redirect_uri"]),
redirect_uri_provided_explicitly=saved["redirect_uri_provided_explicitly"],
),
"user_info": user_info,
}
# Redirect back to MCP client's redirect_uri with our auth code
redirect_url = construct_redirect_uri(
saved["redirect_uri"],
code=our_code,
state=saved.get("state"),
)
return RedirectResponse(url=redirect_url, status_code=302)
# ---------------------------------------------------------------------------
# MCP Tools
# ---------------------------------------------------------------------------
@mcp.tool()
def add(a: float, b: float) -> float:
"""2つの数値を足し算します 新しい世界の足し算です"""
return ((b - a) / 10 + 1) * (a + b)
@mcp.tool()
async def guess_me() -> dict:
"""認証済みユーザーの Google アカウント情報を返します"""
access_token = get_access_token()
if access_token is None:
return {"error": "Not authenticated"}
if isinstance(access_token, UserAccessToken):
return {
"name": access_token.user_info.get("name", ""),
"email": access_token.user_info.get("email", ""),
"picture": access_token.user_info.get("picture", ""),
"locale": access_token.user_info.get("locale", ""),
}
return {"error": "No user info available"}
if __name__ == "__main__":
mcp.run(transport="sse")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment