Created
February 16, 2026 13:03
-
-
Save knjname/8cd36b3c0d2fc63baf79afd1a866b382 to your computer and use it in GitHub Desktop.
MCP server with Google OIDC authentication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = ["mcp[cli]", "httpx"] | |
| # /// | |
| from __future__ import annotations | |
| import os | |
| import secrets | |
| import time | |
| from urllib.parse import urlencode | |
| import httpx | |
| from mcp.server.auth.middleware.auth_context import get_access_token | |
| from mcp.server.auth.provider import ( | |
| AccessToken, | |
| AuthorizationCode, | |
| AuthorizationParams, | |
| OAuthAuthorizationServerProvider, | |
| RefreshToken, | |
| construct_redirect_uri, | |
| ) | |
| from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions | |
| from mcp.server.fastmcp import FastMCP | |
| from mcp.shared.auth import OAuthClientInformationFull, OAuthToken | |
| from pydantic import AnyUrl | |
| from starlette.requests import Request | |
| from starlette.responses import JSONResponse, RedirectResponse, Response | |
| # --------------------------------------------------------------------------- | |
| # Google OAuth settings | |
| # --------------------------------------------------------------------------- | |
| GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "") | |
| GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "") | |
| GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" | |
| GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" | |
| GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo" | |
| SERVER_URL = "http://localhost:8000" | |
| GOOGLE_CALLBACK_PATH = "/google/callback" | |
| # --------------------------------------------------------------------------- | |
| # Extended AccessToken that carries user info | |
| # --------------------------------------------------------------------------- | |
| class UserAccessToken(AccessToken): | |
| user_info: dict = {} | |
| # --------------------------------------------------------------------------- | |
| # GoogleOAuthProvider – implements OAuthAuthorizationServerProvider | |
| # --------------------------------------------------------------------------- | |
| class GoogleOAuthProvider: | |
| """MCP OAuth authorization server that delegates authentication to Google.""" | |
| def __init__(self) -> None: | |
| # Dynamic client registration store: client_id -> OAuthClientInformationFull | |
| self.clients: dict[str, OAuthClientInformationFull] = {} | |
| # Authorization codes: code -> AuthorizationCode + user_info | |
| self.auth_codes: dict[str, dict] = {} | |
| # Access tokens: token_string -> UserAccessToken | |
| self.access_tokens: dict[str, UserAccessToken] = {} | |
| # Refresh tokens: token_string -> RefreshToken + user_info | |
| self.refresh_tokens: dict[str, dict] = {} | |
| # Google OAuth state -> MCP client params (redirect_uri, state, code_challenge, etc.) | |
| self.google_states: dict[str, dict] = {} | |
| # -- Client registration -------------------------------------------------- | |
| async def get_client(self, client_id: str) -> OAuthClientInformationFull | None: | |
| return self.clients.get(client_id) | |
| async def register_client(self, client_info: OAuthClientInformationFull) -> None: | |
| self.clients[client_info.client_id] = client_info | |
| # -- Authorization --------------------------------------------------------- | |
| async def authorize( | |
| self, client: OAuthClientInformationFull, params: AuthorizationParams | |
| ) -> str: | |
| """Redirect to Google's authorization endpoint.""" | |
| google_state = secrets.token_urlsafe(32) | |
| # Save MCP client params so we can resume the flow in the callback | |
| self.google_states[google_state] = { | |
| "mcp_client_id": client.client_id, | |
| "redirect_uri": str(params.redirect_uri), | |
| "state": params.state, | |
| "code_challenge": params.code_challenge, | |
| "scopes": params.scopes, | |
| "redirect_uri_provided_explicitly": params.redirect_uri_provided_explicitly, | |
| } | |
| google_params = { | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "redirect_uri": f"{SERVER_URL}{GOOGLE_CALLBACK_PATH}", | |
| "response_type": "code", | |
| "scope": "openid email profile", | |
| "state": google_state, | |
| "access_type": "offline", | |
| "prompt": "consent", | |
| } | |
| return f"{GOOGLE_AUTH_URL}?{urlencode(google_params)}" | |
| # -- Authorization code loading / exchange --------------------------------- | |
| async def load_authorization_code( | |
| self, client: OAuthClientInformationFull, authorization_code: str | |
| ) -> AuthorizationCode | None: | |
| entry = self.auth_codes.get(authorization_code) | |
| if entry is None: | |
| return None | |
| return entry["code_obj"] | |
| async def exchange_authorization_code( | |
| self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode | |
| ) -> OAuthToken: | |
| entry = self.auth_codes.pop(authorization_code.code, None) | |
| if entry is None: | |
| raise ValueError("Invalid authorization code") | |
| user_info: dict = entry["user_info"] | |
| # Issue access token | |
| access_token_str = secrets.token_urlsafe(48) | |
| self.access_tokens[access_token_str] = UserAccessToken( | |
| token=access_token_str, | |
| client_id=client.client_id, | |
| scopes=authorization_code.scopes, | |
| expires_at=int(time.time()) + 3600, | |
| user_info=user_info, | |
| ) | |
| # Issue refresh token | |
| refresh_token_str = secrets.token_urlsafe(48) | |
| self.refresh_tokens[refresh_token_str] = { | |
| "token_obj": RefreshToken( | |
| token=refresh_token_str, | |
| client_id=client.client_id, | |
| scopes=authorization_code.scopes, | |
| expires_at=int(time.time()) + 86400, | |
| ), | |
| "user_info": user_info, | |
| } | |
| return OAuthToken( | |
| access_token=access_token_str, | |
| token_type="Bearer", | |
| expires_in=3600, | |
| refresh_token=refresh_token_str, | |
| ) | |
| # -- Access token loading -------------------------------------------------- | |
| async def load_access_token(self, token: str) -> UserAccessToken | None: | |
| at = self.access_tokens.get(token) | |
| if at is None: | |
| return None | |
| if at.expires_at and at.expires_at < time.time(): | |
| del self.access_tokens[token] | |
| return None | |
| return at | |
| # -- Refresh token loading / exchange -------------------------------------- | |
| async def load_refresh_token( | |
| self, client: OAuthClientInformationFull, refresh_token: str | |
| ) -> RefreshToken | None: | |
| entry = self.refresh_tokens.get(refresh_token) | |
| if entry is None: | |
| return None | |
| return entry["token_obj"] | |
| async def exchange_refresh_token( | |
| self, | |
| client: OAuthClientInformationFull, | |
| refresh_token: RefreshToken, | |
| scopes: list[str], | |
| ) -> OAuthToken: | |
| entry = self.refresh_tokens.pop(refresh_token.token, None) | |
| if entry is None: | |
| raise ValueError("Invalid refresh token") | |
| user_info: dict = entry["user_info"] | |
| # Issue new access token | |
| access_token_str = secrets.token_urlsafe(48) | |
| self.access_tokens[access_token_str] = UserAccessToken( | |
| token=access_token_str, | |
| client_id=client.client_id, | |
| scopes=scopes or refresh_token.scopes, | |
| expires_at=int(time.time()) + 3600, | |
| user_info=user_info, | |
| ) | |
| # Issue new refresh token | |
| new_refresh_token_str = secrets.token_urlsafe(48) | |
| self.refresh_tokens[new_refresh_token_str] = { | |
| "token_obj": RefreshToken( | |
| token=new_refresh_token_str, | |
| client_id=client.client_id, | |
| scopes=scopes or refresh_token.scopes, | |
| expires_at=int(time.time()) + 86400, | |
| ), | |
| "user_info": user_info, | |
| } | |
| return OAuthToken( | |
| access_token=access_token_str, | |
| token_type="Bearer", | |
| expires_in=3600, | |
| refresh_token=new_refresh_token_str, | |
| ) | |
| # -- Revocation ------------------------------------------------------------ | |
| async def revoke_token( | |
| self, token: UserAccessToken | RefreshToken | |
| ) -> None: | |
| if isinstance(token, UserAccessToken): | |
| self.access_tokens.pop(token.token, None) | |
| else: | |
| self.refresh_tokens.pop(token.token, None) | |
| # --------------------------------------------------------------------------- | |
| # Provider instance & FastMCP setup | |
| # --------------------------------------------------------------------------- | |
| provider = GoogleOAuthProvider() | |
| mcp = FastMCP( | |
| "add-server", | |
| auth_server_provider=provider, | |
| auth=AuthSettings( | |
| issuer_url=AnyUrl(SERVER_URL), | |
| resource_server_url=AnyUrl(SERVER_URL), | |
| client_registration_options=ClientRegistrationOptions(enabled=True), | |
| ), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Google callback route | |
| # --------------------------------------------------------------------------- | |
| @mcp.custom_route(GOOGLE_CALLBACK_PATH, methods=["GET"]) | |
| async def google_callback(request: Request) -> Response: | |
| """Handle the redirect from Google after user authenticates.""" | |
| code = request.query_params.get("code") | |
| state = request.query_params.get("state") | |
| error = request.query_params.get("error") | |
| if error: | |
| return JSONResponse({"error": error}, status_code=400) | |
| if not code or not state: | |
| return JSONResponse({"error": "Missing code or state"}, status_code=400) | |
| saved = provider.google_states.pop(state, None) | |
| if saved is None: | |
| return JSONResponse({"error": "Invalid state"}, status_code=400) | |
| # Exchange Google auth code for tokens | |
| async with httpx.AsyncClient() as client: | |
| token_resp = await client.post( | |
| GOOGLE_TOKEN_URL, | |
| data={ | |
| "code": code, | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "redirect_uri": f"{SERVER_URL}{GOOGLE_CALLBACK_PATH}", | |
| "grant_type": "authorization_code", | |
| }, | |
| ) | |
| if token_resp.status_code != 200: | |
| return JSONResponse( | |
| {"error": "Failed to exchange code with Google", "detail": token_resp.text}, | |
| status_code=502, | |
| ) | |
| google_tokens = token_resp.json() | |
| # Fetch user info from Google | |
| userinfo_resp = await client.get( | |
| GOOGLE_USERINFO_URL, | |
| headers={"Authorization": f"Bearer {google_tokens['access_token']}"}, | |
| ) | |
| if userinfo_resp.status_code != 200: | |
| return JSONResponse( | |
| {"error": "Failed to fetch user info from Google"}, | |
| status_code=502, | |
| ) | |
| user_info = userinfo_resp.json() | |
| # Generate our own authorization code for the MCP client | |
| our_code = secrets.token_urlsafe(32) | |
| provider.auth_codes[our_code] = { | |
| "code_obj": AuthorizationCode( | |
| code=our_code, | |
| scopes=saved.get("scopes") or [], | |
| expires_at=time.time() + 300, | |
| client_id=saved["mcp_client_id"], | |
| code_challenge=saved["code_challenge"], | |
| redirect_uri=AnyUrl(saved["redirect_uri"]), | |
| redirect_uri_provided_explicitly=saved["redirect_uri_provided_explicitly"], | |
| ), | |
| "user_info": user_info, | |
| } | |
| # Redirect back to MCP client's redirect_uri with our auth code | |
| redirect_url = construct_redirect_uri( | |
| saved["redirect_uri"], | |
| code=our_code, | |
| state=saved.get("state"), | |
| ) | |
| return RedirectResponse(url=redirect_url, status_code=302) | |
| # --------------------------------------------------------------------------- | |
| # MCP Tools | |
| # --------------------------------------------------------------------------- | |
| @mcp.tool() | |
| def add(a: float, b: float) -> float: | |
| """2つの数値を足し算します 新しい世界の足し算です""" | |
| return ((b - a) / 10 + 1) * (a + b) | |
| @mcp.tool() | |
| async def guess_me() -> dict: | |
| """認証済みユーザーの Google アカウント情報を返します""" | |
| access_token = get_access_token() | |
| if access_token is None: | |
| return {"error": "Not authenticated"} | |
| if isinstance(access_token, UserAccessToken): | |
| return { | |
| "name": access_token.user_info.get("name", ""), | |
| "email": access_token.user_info.get("email", ""), | |
| "picture": access_token.user_info.get("picture", ""), | |
| "locale": access_token.user_info.get("locale", ""), | |
| } | |
| return {"error": "No user info available"} | |
| if __name__ == "__main__": | |
| mcp.run(transport="sse") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment