Skip to content

Instantly share code, notes, and snippets.

@knjname
Created February 16, 2026 13:03
Show Gist options
  • Select an option

  • Save knjname/8cd36b3c0d2fc63baf79afd1a866b382 to your computer and use it in GitHub Desktop.

Select an option

Save knjname/8cd36b3c0d2fc63baf79afd1a866b382 to your computer and use it in GitHub Desktop.
MCP server with Google OIDC authentication
# /// script
# requires-python = ">=3.10"
# dependencies = ["mcp[cli]", "httpx"]
# ///
from __future__ import annotations
import os
import secrets
import time
from urllib.parse import urlencode
import httpx
from mcp.server.auth.middleware.auth_context import get_access_token
from mcp.server.auth.provider import (
AccessToken,
AuthorizationCode,
AuthorizationParams,
OAuthAuthorizationServerProvider,
RefreshToken,
construct_redirect_uri,
)
from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions
from mcp.server.fastmcp import FastMCP
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
from pydantic import AnyUrl
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse, Response
# ---------------------------------------------------------------------------
# Google OAuth settings
# ---------------------------------------------------------------------------
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "")
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
SERVER_URL = "http://localhost:8000"
GOOGLE_CALLBACK_PATH = "/google/callback"
# ---------------------------------------------------------------------------
# Extended AccessToken that carries user info
# ---------------------------------------------------------------------------
class UserAccessToken(AccessToken):
user_info: dict = {}
# ---------------------------------------------------------------------------
# GoogleOAuthProvider – implements OAuthAuthorizationServerProvider
# ---------------------------------------------------------------------------
class GoogleOAuthProvider:
"""MCP OAuth authorization server that delegates authentication to Google."""
def __init__(self) -> None:
# Dynamic client registration store: client_id -> OAuthClientInformationFull
self.clients: dict[str, OAuthClientInformationFull] = {}
# Authorization codes: code -> AuthorizationCode + user_info
self.auth_codes: dict[str, dict] = {}
# Access tokens: token_string -> UserAccessToken
self.access_tokens: dict[str, UserAccessToken] = {}
# Refresh tokens: token_string -> RefreshToken + user_info
self.refresh_tokens: dict[str, dict] = {}
# Google OAuth state -> MCP client params (redirect_uri, state, code_challenge, etc.)
self.google_states: dict[str, dict] = {}
# -- Client registration --------------------------------------------------
async def get_client(self, client_id: str) -> OAuthClientInformationFull | None:
return self.clients.get(client_id)
async def register_client(self, client_info: OAuthClientInformationFull) -> None:
self.clients[client_info.client_id] = client_info
# -- Authorization ---------------------------------------------------------
async def authorize(
self, client: OAuthClientInformationFull, params: AuthorizationParams
) -> str:
"""Redirect to Google's authorization endpoint."""
google_state = secrets.token_urlsafe(32)
# Save MCP client params so we can resume the flow in the callback
self.google_states[google_state] = {
"mcp_client_id": client.client_id,
"redirect_uri": str(params.redirect_uri),
"state": params.state,
"code_challenge": params.code_challenge,
"scopes": params.scopes,
"redirect_uri_provided_explicitly": params.redirect_uri_provided_explicitly,
}
google_params = {
"client_id": GOOGLE_CLIENT_ID,
"redirect_uri": f"{SERVER_URL}{GOOGLE_CALLBACK_PATH}",
"response_type": "code",
"scope": "openid email profile",
"state": google_state,
"access_type": "offline",
"prompt": "consent",
}
return f"{GOOGLE_AUTH_URL}?{urlencode(google_params)}"
# -- Authorization code loading / exchange ---------------------------------
async def load_authorization_code(
self, client: OAuthClientInformationFull, authorization_code: str
) -> AuthorizationCode | None:
entry = self.auth_codes.get(authorization_code)
if entry is None:
return None
return entry["code_obj"]
async def exchange_authorization_code(
self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode
) -> OAuthToken:
entry = self.auth_codes.pop(authorization_code.code, None)
if entry is None:
raise ValueError("Invalid authorization code")
user_info: dict = entry["user_info"]
# Issue access token
access_token_str = secrets.token_urlsafe(48)
self.access_tokens[access_token_str] = UserAccessToken(
token=access_token_str,
client_id=client.client_id,
scopes=authorization_code.scopes,
expires_at=int(time.time()) + 3600,
user_info=user_info,
)
# Issue refresh token
refresh_token_str = secrets.token_urlsafe(48)
self.refresh_tokens[refresh_token_str] = {
"token_obj": RefreshToken(
token=refresh_token_str,
client_id=client.client_id,
scopes=authorization_code.scopes,
expires_at=int(time.time()) + 86400,
),
"user_info": user_info,
}
return OAuthToken(
access_token=access_token_str,
token_type="Bearer",
expires_in=3600,
refresh_token=refresh_token_str,
)
# -- Access token loading --------------------------------------------------
async def load_access_token(self, token: str) -> UserAccessToken | None:
at = self.access_tokens.get(token)
if at is None:
return None
if at.expires_at and at.expires_at < time.time():
del self.access_tokens[token]
return None
return at
# -- Refresh token loading / exchange --------------------------------------
async def load_refresh_token(
self, client: OAuthClientInformationFull, refresh_token: str
) -> RefreshToken | None:
entry = self.refresh_tokens.get(refresh_token)
if entry is None:
return None
return entry["token_obj"]
async def exchange_refresh_token(
self,
client: OAuthClientInformationFull,
refresh_token: RefreshToken,
scopes: list[str],
) -> OAuthToken:
entry = self.refresh_tokens.pop(refresh_token.token, None)
if entry is None:
raise ValueError("Invalid refresh token")
user_info: dict = entry["user_info"]
# Issue new access token
access_token_str = secrets.token_urlsafe(48)
self.access_tokens[access_token_str] = UserAccessToken(
token=access_token_str,
client_id=client.client_id,
scopes=scopes or refresh_token.scopes,
expires_at=int(time.time()) + 3600,
user_info=user_info,
)
# Issue new refresh token
new_refresh_token_str = secrets.token_urlsafe(48)
self.refresh_tokens[new_refresh_token_str] = {
"token_obj": RefreshToken(
token=new_refresh_token_str,
client_id=client.client_id,
scopes=scopes or refresh_token.scopes,
expires_at=int(time.time()) + 86400,
),
"user_info": user_info,
}
return OAuthToken(
access_token=access_token_str,
token_type="Bearer",
expires_in=3600,
refresh_token=new_refresh_token_str,
)
# -- Revocation ------------------------------------------------------------
async def revoke_token(
self, token: UserAccessToken | RefreshToken
) -> None:
if isinstance(token, UserAccessToken):
self.access_tokens.pop(token.token, None)
else:
self.refresh_tokens.pop(token.token, None)
# ---------------------------------------------------------------------------
# Provider instance & FastMCP setup
# ---------------------------------------------------------------------------
provider = GoogleOAuthProvider()
mcp = FastMCP(
"add-server",
auth_server_provider=provider,
auth=AuthSettings(
issuer_url=AnyUrl(SERVER_URL),
resource_server_url=AnyUrl(SERVER_URL),
client_registration_options=ClientRegistrationOptions(enabled=True),
),
)
# ---------------------------------------------------------------------------
# Google callback route
# ---------------------------------------------------------------------------
@mcp.custom_route(GOOGLE_CALLBACK_PATH, methods=["GET"])
async def google_callback(request: Request) -> Response:
"""Handle the redirect from Google after user authenticates."""
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
if error:
return JSONResponse({"error": error}, status_code=400)
if not code or not state:
return JSONResponse({"error": "Missing code or state"}, status_code=400)
saved = provider.google_states.pop(state, None)
if saved is None:
return JSONResponse({"error": "Invalid state"}, status_code=400)
# Exchange Google auth code for tokens
async with httpx.AsyncClient() as client:
token_resp = await client.post(
GOOGLE_TOKEN_URL,
data={
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": f"{SERVER_URL}{GOOGLE_CALLBACK_PATH}",
"grant_type": "authorization_code",
},
)
if token_resp.status_code != 200:
return JSONResponse(
{"error": "Failed to exchange code with Google", "detail": token_resp.text},
status_code=502,
)
google_tokens = token_resp.json()
# Fetch user info from Google
userinfo_resp = await client.get(
GOOGLE_USERINFO_URL,
headers={"Authorization": f"Bearer {google_tokens['access_token']}"},
)
if userinfo_resp.status_code != 200:
return JSONResponse(
{"error": "Failed to fetch user info from Google"},
status_code=502,
)
user_info = userinfo_resp.json()
# Generate our own authorization code for the MCP client
our_code = secrets.token_urlsafe(32)
provider.auth_codes[our_code] = {
"code_obj": AuthorizationCode(
code=our_code,
scopes=saved.get("scopes") or [],
expires_at=time.time() + 300,
client_id=saved["mcp_client_id"],
code_challenge=saved["code_challenge"],
redirect_uri=AnyUrl(saved["redirect_uri"]),
redirect_uri_provided_explicitly=saved["redirect_uri_provided_explicitly"],
),
"user_info": user_info,
}
# Redirect back to MCP client's redirect_uri with our auth code
redirect_url = construct_redirect_uri(
saved["redirect_uri"],
code=our_code,
state=saved.get("state"),
)
return RedirectResponse(url=redirect_url, status_code=302)
# ---------------------------------------------------------------------------
# MCP Tools
# ---------------------------------------------------------------------------
@mcp.tool()
def add(a: float, b: float) -> float:
"""2つの数値を足し算します 新しい世界の足し算です"""
return ((b - a) / 10 + 1) * (a + b)
@mcp.tool()
async def guess_me() -> dict:
"""認証済みユーザーの Google アカウント情報を返します"""
access_token = get_access_token()
if access_token is None:
return {"error": "Not authenticated"}
if isinstance(access_token, UserAccessToken):
return {
"name": access_token.user_info.get("name", ""),
"email": access_token.user_info.get("email", ""),
"picture": access_token.user_info.get("picture", ""),
"locale": access_token.user_info.get("locale", ""),
}
return {"error": "No user info available"}
if __name__ == "__main__":
mcp.run(transport="sse")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment