|
import httpx |
|
|
|
|
|
class CookieAuthProvider(AuthProvider): |
|
""" |
|
Authenticates requests using a session cookie stored by the shared auth service. |
|
|
|
Reads the session cookie from the request and forwards it to the passport |
|
endpoint directly via httpx. Useful for server-side flows where a bearer |
|
token is not present but a session cookie is. |
|
|
|
Args: |
|
passport_url: Full URL of the passport endpoint, e.g. |
|
"https://iam.internal/v1/passport" |
|
cookie_name: Name of the session cookie to look for. Defaults to "session". |
|
|
|
Example: |
|
cookie_auth_provider = CookieAuthProvider( |
|
passport_url="https://iam.internal/v1/passport", |
|
cookie_name="anaconda_session", |
|
) |
|
""" |
|
|
|
def __init__(self, passport_url: str, cookie_name: str = "session"): |
|
self.passport_url = passport_url |
|
self.cookie_name = cookie_name |
|
self._client: Optional[httpx.AsyncClient] = None |
|
|
|
async def initialize(self) -> None: |
|
log.info(f"Initializing Cookie Auth Provider (cookie: '{self.cookie_name}')") |
|
self._client = httpx.AsyncClient() |
|
|
|
@property |
|
def client(self) -> httpx.AsyncClient: |
|
if self._client is None: |
|
raise RuntimeError("CookieAuthProvider not initialized — call initialize() first") |
|
return self._client |
|
|
|
def can_handle(self, request: Request) -> bool: |
|
return self.cookie_name in request.cookies |
|
|
|
async def handle_request(self, request: Request) -> Passport: |
|
cookie_value = request.cookies.get(self.cookie_name) |
|
if not cookie_value: |
|
raise AuthInvalid("Missing session cookie") |
|
|
|
try: |
|
response = await self.client.get( |
|
self.passport_url, |
|
cookies={self.cookie_name: cookie_value}, |
|
) |
|
response.raise_for_status() |
|
except httpx.HTTPStatusError as e: |
|
log.warning(f"Passport endpoint returned {e.response.status_code}") |
|
raise AuthInvalid("Session cookie rejected by auth service") |
|
except httpx.RequestError as e: |
|
log.warning(f"Failed to reach passport endpoint: {e}") |
|
raise AuthInvalid("Unable to reach auth service") |
|
|
|
try: |
|
return Passport(**response.json()) |
|
except Exception as e: |
|
log.warning(f"Failed to parse passport response: {e}") |
|
raise AuthInvalid("Invalid passport response from auth service") |