Created
February 16, 2026 13:11
-
-
Save knjname/3843efa669705f5ebb983550c35c2a64 to your computer and use it in GitHub Desktop.
MCP server with Google OIDC authentication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = ["mcp[cli]", "httpx"] | |
| # /// | |
| from __future__ import annotations | |
| import os | |
| import secrets | |
| import time | |
| from urllib.parse import urlencode | |
| import httpx | |
| from mcp.server.auth.middleware.auth_context import get_access_token | |
| from mcp.server.auth.provider import ( | |
| AccessToken, | |
| AuthorizationCode, | |
| AuthorizationParams, | |
| OAuthAuthorizationServerProvider, | |
| RefreshToken, | |
| construct_redirect_uri, | |
| ) | |
| from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions | |
| from mcp.server.fastmcp import FastMCP | |
| from mcp.shared.auth import OAuthClientInformationFull, OAuthToken | |
| from pydantic import AnyUrl | |
| from starlette.requests import Request | |
| from starlette.responses import JSONResponse, RedirectResponse, Response | |
| # --------------------------------------------------------------------------- | |
| # Server settings | |
| # --------------------------------------------------------------------------- | |
| SERVER_URL = "http://localhost:8000" | |
| OIDC_CALLBACK_PATH = "/google/callback" | |
| # --------------------------------------------------------------------------- | |
| # Google OAuth settings (defaults) | |
| # --------------------------------------------------------------------------- | |
| GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "") | |
| GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "") | |
| GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" | |
| GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" | |
| GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo" | |
| # --------------------------------------------------------------------------- | |
| # Extended AccessToken that carries user info | |
| # --------------------------------------------------------------------------- | |
| class UserAccessToken(AccessToken): | |
| user_info: dict = {} | |
| # --------------------------------------------------------------------------- | |
| # OIDCOAuthProvider – implements OAuthAuthorizationServerProvider | |
| # --------------------------------------------------------------------------- | |
| class OIDCOAuthProvider: | |
| """MCP OAuth authorization server that delegates authentication to an external OIDC provider.""" | |
| def __init__( | |
| self, | |
| *, | |
| server_url: str, | |
| callback_path: str, | |
| oidc_client_id: str, | |
| oidc_client_secret: str, | |
| oidc_auth_url: str, | |
| oidc_token_url: str, | |
| oidc_userinfo_url: str, | |
| oidc_scope: str = "openid email profile", | |
| oidc_extra_auth_params: dict[str, str] | None = None, | |
| ) -> None: | |
| self.server_url = server_url | |
| self.callback_path = callback_path | |
| self.oidc_client_id = oidc_client_id | |
| self.oidc_client_secret = oidc_client_secret | |
| self.oidc_auth_url = oidc_auth_url | |
| self.oidc_token_url = oidc_token_url | |
| self.oidc_userinfo_url = oidc_userinfo_url | |
| self.oidc_scope = oidc_scope | |
| self.oidc_extra_auth_params = oidc_extra_auth_params or {} | |
| # Dynamic client registration store: client_id -> OAuthClientInformationFull | |
| self.clients: dict[str, OAuthClientInformationFull] = {} | |
| # Authorization codes: code -> AuthorizationCode + user_info | |
| self.auth_codes: dict[str, dict] = {} | |
| # Access tokens: token_string -> UserAccessToken | |
| self.access_tokens: dict[str, UserAccessToken] = {} | |
| # Refresh tokens: token_string -> RefreshToken + user_info | |
| self.refresh_tokens: dict[str, dict] = {} | |
| # OIDC provider state -> MCP client params (redirect_uri, state, code_challenge, etc.) | |
| self.oidc_states: dict[str, dict] = {} | |
| # -- Client registration -------------------------------------------------- | |
| async def get_client(self, client_id: str) -> OAuthClientInformationFull | None: | |
| return self.clients.get(client_id) | |
| async def register_client(self, client_info: OAuthClientInformationFull) -> None: | |
| self.clients[client_info.client_id] = client_info | |
| # -- Authorization --------------------------------------------------------- | |
| async def authorize( | |
| self, client: OAuthClientInformationFull, params: AuthorizationParams | |
| ) -> str: | |
| """Redirect to the OIDC provider's authorization endpoint.""" | |
| state = secrets.token_urlsafe(32) | |
| # Save MCP client params so we can resume the flow in the callback | |
| self.oidc_states[state] = { | |
| "mcp_client_id": client.client_id, | |
| "redirect_uri": str(params.redirect_uri), | |
| "state": params.state, | |
| "code_challenge": params.code_challenge, | |
| "scopes": params.scopes, | |
| "redirect_uri_provided_explicitly": params.redirect_uri_provided_explicitly, | |
| } | |
| oidc_params = { | |
| "client_id": self.oidc_client_id, | |
| "redirect_uri": f"{self.server_url}{self.callback_path}", | |
| "response_type": "code", | |
| "scope": self.oidc_scope, | |
| "state": state, | |
| **self.oidc_extra_auth_params, | |
| } | |
| return f"{self.oidc_auth_url}?{urlencode(oidc_params)}" | |
| # -- Authorization code loading / exchange --------------------------------- | |
| async def load_authorization_code( | |
| self, client: OAuthClientInformationFull, authorization_code: str | |
| ) -> AuthorizationCode | None: | |
| entry = self.auth_codes.get(authorization_code) | |
| if entry is None: | |
| return None | |
| return entry["code_obj"] | |
| async def exchange_authorization_code( | |
| self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode | |
| ) -> OAuthToken: | |
| entry = self.auth_codes.pop(authorization_code.code, None) | |
| if entry is None: | |
| raise ValueError("Invalid authorization code") | |
| user_info: dict = entry["user_info"] | |
| # Issue access token | |
| access_token_str = secrets.token_urlsafe(48) | |
| self.access_tokens[access_token_str] = UserAccessToken( | |
| token=access_token_str, | |
| client_id=client.client_id, | |
| scopes=authorization_code.scopes, | |
| expires_at=int(time.time()) + 3600, | |
| user_info=user_info, | |
| ) | |
| # Issue refresh token | |
| refresh_token_str = secrets.token_urlsafe(48) | |
| self.refresh_tokens[refresh_token_str] = { | |
| "token_obj": RefreshToken( | |
| token=refresh_token_str, | |
| client_id=client.client_id, | |
| scopes=authorization_code.scopes, | |
| expires_at=int(time.time()) + 86400, | |
| ), | |
| "user_info": user_info, | |
| } | |
| return OAuthToken( | |
| access_token=access_token_str, | |
| token_type="Bearer", | |
| expires_in=3600, | |
| refresh_token=refresh_token_str, | |
| ) | |
| # -- Access token loading -------------------------------------------------- | |
| async def load_access_token(self, token: str) -> UserAccessToken | None: | |
| at = self.access_tokens.get(token) | |
| if at is None: | |
| return None | |
| if at.expires_at and at.expires_at < time.time(): | |
| del self.access_tokens[token] | |
| return None | |
| return at | |
| # -- Refresh token loading / exchange -------------------------------------- | |
| async def load_refresh_token( | |
| self, client: OAuthClientInformationFull, refresh_token: str | |
| ) -> RefreshToken | None: | |
| entry = self.refresh_tokens.get(refresh_token) | |
| if entry is None: | |
| return None | |
| return entry["token_obj"] | |
| async def exchange_refresh_token( | |
| self, | |
| client: OAuthClientInformationFull, | |
| refresh_token: RefreshToken, | |
| scopes: list[str], | |
| ) -> OAuthToken: | |
| entry = self.refresh_tokens.pop(refresh_token.token, None) | |
| if entry is None: | |
| raise ValueError("Invalid refresh token") | |
| user_info: dict = entry["user_info"] | |
| # Issue new access token | |
| access_token_str = secrets.token_urlsafe(48) | |
| self.access_tokens[access_token_str] = UserAccessToken( | |
| token=access_token_str, | |
| client_id=client.client_id, | |
| scopes=scopes or refresh_token.scopes, | |
| expires_at=int(time.time()) + 3600, | |
| user_info=user_info, | |
| ) | |
| # Issue new refresh token | |
| new_refresh_token_str = secrets.token_urlsafe(48) | |
| self.refresh_tokens[new_refresh_token_str] = { | |
| "token_obj": RefreshToken( | |
| token=new_refresh_token_str, | |
| client_id=client.client_id, | |
| scopes=scopes or refresh_token.scopes, | |
| expires_at=int(time.time()) + 86400, | |
| ), | |
| "user_info": user_info, | |
| } | |
| return OAuthToken( | |
| access_token=access_token_str, | |
| token_type="Bearer", | |
| expires_in=3600, | |
| refresh_token=new_refresh_token_str, | |
| ) | |
| # -- Revocation ------------------------------------------------------------ | |
| async def revoke_token( | |
| self, token: UserAccessToken | RefreshToken | |
| ) -> None: | |
| if isinstance(token, UserAccessToken): | |
| self.access_tokens.pop(token.token, None) | |
| else: | |
| self.refresh_tokens.pop(token.token, None) | |
| # --------------------------------------------------------------------------- | |
| # Provider instance & FastMCP setup | |
| # --------------------------------------------------------------------------- | |
| provider = OIDCOAuthProvider( | |
| server_url=SERVER_URL, | |
| callback_path=OIDC_CALLBACK_PATH, | |
| oidc_client_id=GOOGLE_CLIENT_ID, | |
| oidc_client_secret=GOOGLE_CLIENT_SECRET, | |
| oidc_auth_url=GOOGLE_AUTH_URL, | |
| oidc_token_url=GOOGLE_TOKEN_URL, | |
| oidc_userinfo_url=GOOGLE_USERINFO_URL, | |
| oidc_extra_auth_params={"access_type": "offline", "prompt": "consent"}, | |
| ) | |
| mcp = FastMCP( | |
| "add-server", | |
| auth_server_provider=provider, | |
| auth=AuthSettings( | |
| issuer_url=AnyUrl(SERVER_URL), | |
| resource_server_url=AnyUrl(SERVER_URL), | |
| client_registration_options=ClientRegistrationOptions(enabled=True), | |
| ), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # OIDC callback route | |
| # --------------------------------------------------------------------------- | |
| @mcp.custom_route(OIDC_CALLBACK_PATH, methods=["GET"]) | |
| async def oidc_callback(request: Request) -> Response: | |
| """Handle the redirect from the OIDC provider after user authenticates.""" | |
| code = request.query_params.get("code") | |
| state = request.query_params.get("state") | |
| error = request.query_params.get("error") | |
| if error: | |
| return JSONResponse({"error": error}, status_code=400) | |
| if not code or not state: | |
| return JSONResponse({"error": "Missing code or state"}, status_code=400) | |
| saved = provider.oidc_states.pop(state, None) | |
| if saved is None: | |
| return JSONResponse({"error": "Invalid state"}, status_code=400) | |
| # Exchange auth code for tokens with the OIDC provider | |
| async with httpx.AsyncClient() as client: | |
| token_resp = await client.post( | |
| provider.oidc_token_url, | |
| data={ | |
| "code": code, | |
| "client_id": provider.oidc_client_id, | |
| "client_secret": provider.oidc_client_secret, | |
| "redirect_uri": f"{provider.server_url}{provider.callback_path}", | |
| "grant_type": "authorization_code", | |
| }, | |
| ) | |
| if token_resp.status_code != 200: | |
| return JSONResponse( | |
| {"error": "Failed to exchange code with OIDC provider", "detail": token_resp.text}, | |
| status_code=502, | |
| ) | |
| oidc_tokens = token_resp.json() | |
| # Fetch user info from the OIDC provider | |
| userinfo_resp = await client.get( | |
| provider.oidc_userinfo_url, | |
| headers={"Authorization": f"Bearer {oidc_tokens['access_token']}"}, | |
| ) | |
| if userinfo_resp.status_code != 200: | |
| return JSONResponse( | |
| {"error": "Failed to fetch user info from OIDC provider"}, | |
| status_code=502, | |
| ) | |
| user_info = userinfo_resp.json() | |
| # Generate our own authorization code for the MCP client | |
| our_code = secrets.token_urlsafe(32) | |
| provider.auth_codes[our_code] = { | |
| "code_obj": AuthorizationCode( | |
| code=our_code, | |
| scopes=saved.get("scopes") or [], | |
| expires_at=time.time() + 300, | |
| client_id=saved["mcp_client_id"], | |
| code_challenge=saved["code_challenge"], | |
| redirect_uri=AnyUrl(saved["redirect_uri"]), | |
| redirect_uri_provided_explicitly=saved["redirect_uri_provided_explicitly"], | |
| ), | |
| "user_info": user_info, | |
| } | |
| # Redirect back to MCP client's redirect_uri with our auth code | |
| redirect_url = construct_redirect_uri( | |
| saved["redirect_uri"], | |
| code=our_code, | |
| state=saved.get("state"), | |
| ) | |
| return RedirectResponse(url=redirect_url, status_code=302) | |
| # --------------------------------------------------------------------------- | |
| # MCP Tools | |
| # --------------------------------------------------------------------------- | |
| @mcp.tool() | |
| def add(a: float, b: float) -> float: | |
| """2つの数値を足し算します 新しい世界の足し算です""" | |
| return ((b - a) / 10 + 1) * (a + b) | |
| @mcp.tool() | |
| async def guess_me() -> dict: | |
| """認証済みユーザーの Google アカウント情報を返します""" | |
| access_token = get_access_token() | |
| if access_token is None: | |
| return {"error": "Not authenticated"} | |
| if isinstance(access_token, UserAccessToken): | |
| return { | |
| "name": access_token.user_info.get("name", ""), | |
| "email": access_token.user_info.get("email", ""), | |
| "picture": access_token.user_info.get("picture", ""), | |
| "locale": access_token.user_info.get("locale", ""), | |
| } | |
| return {"error": "No user info available"} | |
| if __name__ == "__main__": | |
| mcp.run(transport="sse") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment