Last active
March 20, 2025 06:35
-
-
Save rksk/4dfec7d3e176f0dbd86f7087b8968d0b to your computer and use it in GitHub Desktop.
Asgardeo/WSO2IS App Native Login Client (generated with CursorAI)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import json | |
| import getpass | |
| from typing import Dict, Any, Optional | |
| import logging | |
| from datetime import datetime | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # IdP Configuration | |
| # 1. Create an Standard-Based OIDC Application on Asgardeo or WSO2 IS 7.x | |
| # 2. In the Protocol tab, enable code grant, add a dummy url to Authorized redirect URLs and enable the public client mode | |
| # 3. Enable App-Native Authentication in the Advanced tab | |
| # 4. Finally, fill the following config params accordingly | |
| ASGARDEO_CONFIG = { | |
| 'HOSTNAME': 'api.asgardeo.io', # Asgardeo server hostname | |
| 'TENANT_DOMAIN': '', # Tenant domain | |
| 'CLIENT_ID': '', # OAuth client ID | |
| 'REDIRECT_URI': '', # Redirect URI | |
| 'SCOPE': 'openid profile email' # Optional scope | |
| } | |
| class AsgardeoClient: | |
| def __init__(self, hostname: str, tenant_domain: str, client_id: str): | |
| """ | |
| Initialize the Asgardeo client. | |
| Args: | |
| hostname: The Asgardeo server hostname (e.g., 'localhost:9443') | |
| tenant_domain: The tenant domain (e.g., 'carbon.super') | |
| client_id: The OAuth client ID | |
| """ | |
| self.base_url = f"https://{hostname}/t/{tenant_domain}/oauth2" | |
| self.client_id = client_id | |
| self.session = requests.Session() | |
| self.flow_id = None | |
| # Add logging interceptor to session | |
| self.session.hooks['response'].append(self._log_response) | |
| def _log_request(self, method: str, url: str, **kwargs) -> None: | |
| """Log the request details.""" | |
| logging.info(f"\n{'='*50}") | |
| logging.info(f"REQUEST: {method} {url}") | |
| if 'params' in kwargs: | |
| logging.info(f"Query Parameters: {json.dumps(kwargs['params'], indent=2)}") | |
| if 'json' in kwargs: | |
| logging.info(f"Request Body: {json.dumps(kwargs['json'], indent=2)}") | |
| logging.info(f"{'='*50}\n") | |
| def _log_response(self, response: requests.Response, *args, **kwargs) -> None: | |
| """Log the response details.""" | |
| logging.info(f"\n{'='*50}") | |
| logging.info(f"RESPONSE: {response.status_code} {response.reason}") | |
| try: | |
| logging.info(f"Response Body: {json.dumps(response.json(), indent=2)}") | |
| except json.JSONDecodeError: | |
| logging.info(f"Response Body: {response.text}") | |
| logging.info(f"{'='*50}\n") | |
| def initiate_auth(self, redirect_uri: str, scope: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Initiate the authentication flow. | |
| Args: | |
| redirect_uri: The redirect URI registered with the application | |
| scope: Optional scope string | |
| Returns: | |
| The initial authentication response | |
| """ | |
| params = { | |
| 'response_type': 'code', | |
| 'client_id': self.client_id, | |
| 'response_mode': 'direct', | |
| 'redirect_uri': redirect_uri | |
| } | |
| if scope: | |
| params['scope'] = scope | |
| url = f"{self.base_url}/authorize" | |
| self._log_request('GET', url, params=params) | |
| response = self.session.get(url, params=params) | |
| response.raise_for_status() | |
| auth_response = response.json() | |
| self.flow_id = auth_response.get('flowId') | |
| return auth_response | |
| def handle_authentication(self, auth_request: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Handle the authentication request. | |
| Args: | |
| auth_request: The authentication request body | |
| Returns: | |
| The authentication response | |
| """ | |
| url = f"{self.base_url}/authn" | |
| self._log_request('POST', url, json=auth_request) | |
| response = self.session.post(url, json=auth_request) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_user_input(self, param: Dict[str, Any]) -> str: | |
| """ | |
| Get user input for a parameter, handling confidential inputs appropriately. | |
| Args: | |
| param: Parameter metadata from the API | |
| Returns: | |
| The user's input value | |
| """ | |
| param_name = param.get('displayName', param.get('param', '')) | |
| is_confidential = param.get('confidential', False) | |
| prompt = f"Enter {param_name}: " | |
| if is_confidential: | |
| return getpass.getpass(prompt) | |
| return input(prompt) | |
| def handle_authenticator_prompt(self, authenticator: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Handle authenticator prompt by collecting required parameters from the user. | |
| Args: | |
| authenticator: Authenticator metadata from the API | |
| Returns: | |
| Authentication request with user inputs | |
| """ | |
| params = {} | |
| for param in authenticator.get('requiredParams', []): | |
| param_metadata = next( | |
| (p for p in authenticator.get('metadata', {}).get('params', []) | |
| if p.get('param') == param), | |
| {'param': param} | |
| ) | |
| params[param] = self.get_user_input(param_metadata) | |
| return { | |
| 'flowId': self.flow_id, | |
| 'selectedAuthenticator': { | |
| 'authenticatorId': authenticator['authenticatorId'], | |
| 'params': params | |
| } | |
| } | |
| def authenticate(self, redirect_uri: str, scope: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Perform the complete authentication flow. | |
| Args: | |
| redirect_uri: The redirect URI registered with the application | |
| scope: Optional scope string | |
| Returns: | |
| The final authentication response | |
| """ | |
| # Step 1: Initiate authentication | |
| logging.info("Initiating authentication...") | |
| auth_response = self.initiate_auth(redirect_uri, scope) | |
| while True: | |
| flow_status = auth_response.get('flowStatus') | |
| if flow_status == 'SUCCESS_COMPLETED': | |
| logging.info("Authentication successful!") | |
| return auth_response | |
| if flow_status == 'FAIL_INCOMPLETE': | |
| # Get error messages from nextStep if available | |
| next_step = auth_response.get('nextStep', {}) | |
| messages = next_step.get('messages', []) | |
| for message in messages: | |
| if message.get('type') == 'ERROR': | |
| logging.error(f"Authentication error: {message.get('message')}") | |
| # Continue with the flow instead of returning | |
| logging.info("Continuing with authentication flow...") | |
| # Handle next step | |
| next_step = auth_response.get('nextStep', {}) | |
| step_type = next_step.get('stepType') | |
| if step_type == 'AUTHENTICATOR_PROMPT': | |
| authenticators = next_step.get('authenticators', []) | |
| if not authenticators: | |
| logging.info("No authenticators available!") | |
| return auth_response | |
| # For simplicity, use the first authenticator | |
| authenticator = authenticators[0] | |
| logging.info(f"\nAuthenticating with: {authenticator.get('authenticator')}") | |
| # Get user input for required parameters | |
| auth_request = self.handle_authenticator_prompt(authenticator) | |
| # Submit authentication request | |
| auth_response = self.handle_authentication(auth_request) | |
| elif step_type == 'MULTI_OPTIONS_PROMPT': | |
| authenticators = next_step.get('authenticators', []) | |
| if not authenticators: | |
| logging.info("No authenticators available!") | |
| return auth_response | |
| # Display available options | |
| logging.info("\nAvailable authentication options:") | |
| for i, auth in enumerate(authenticators, 1): | |
| logging.info(f"{i}. {auth.get('authenticator')}") | |
| # Get user selection | |
| while True: | |
| try: | |
| selection = int(input("\nSelect an option (enter the number): ")) | |
| if 1 <= selection <= len(authenticators): | |
| break | |
| logging.error("Invalid selection. Please try again.") | |
| except ValueError: | |
| logging.error("Please enter a valid number.") | |
| # Get the selected authenticator | |
| selected_authenticator = authenticators[selection - 1] | |
| logging.info(f"\nSelected: {selected_authenticator.get('authenticator')}") | |
| # Check if there are required parameters | |
| required_params = selected_authenticator.get('requiredParams', []) | |
| if required_params: | |
| # Get user input for required parameters | |
| auth_request = self.handle_authenticator_prompt(selected_authenticator) | |
| else: | |
| # If no required parameters, just submit the selection | |
| auth_request = { | |
| 'flowId': self.flow_id, | |
| 'selectedAuthenticator': { | |
| 'authenticatorId': selected_authenticator['authenticatorId'] | |
| } | |
| } | |
| # Submit authentication request | |
| auth_response = self.handle_authentication(auth_request) | |
| else: | |
| logging.info(f"Unsupported step type: {step_type}") | |
| return auth_response | |
| def main(): | |
| # Get configuration from constants | |
| logging.info("Asgardeo Authentication Client") | |
| logging.info("-----------------------------") | |
| logging.info(f"Using configuration:") | |
| logging.info(f"Hostname: {ASGARDEO_CONFIG['HOSTNAME']}") | |
| logging.info(f"Tenant Domain: {ASGARDEO_CONFIG['TENANT_DOMAIN']}") | |
| logging.info(f"Client ID: {ASGARDEO_CONFIG['CLIENT_ID']}") | |
| logging.info(f"Redirect URI: {ASGARDEO_CONFIG['REDIRECT_URI']}") | |
| logging.info(f"Scope: {ASGARDEO_CONFIG['SCOPE']}") | |
| logging.info("-----------------------------") | |
| # Create client and authenticate | |
| client = AsgardeoClient( | |
| ASGARDEO_CONFIG['HOSTNAME'], | |
| ASGARDEO_CONFIG['TENANT_DOMAIN'], | |
| ASGARDEO_CONFIG['CLIENT_ID'] | |
| ) | |
| try: | |
| result = client.authenticate( | |
| ASGARDEO_CONFIG['REDIRECT_URI'], | |
| ASGARDEO_CONFIG['SCOPE'] | |
| ) | |
| if result.get('flowStatus') == 'SUCCESS_COMPLETED': | |
| logging.info("\nAuthentication successful!") | |
| logging.info(f"Authorization code: {result.get('authData', {}).get('code')}") | |
| else: | |
| logging.error("\nAuthentication failed!") | |
| logging.error(f"Error details: {json.dumps(result, indent=2)}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"\nError during authentication: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment