Skip to content

Instantly share code, notes, and snippets.

@rksk
Last active March 20, 2025 06:35
Show Gist options
  • Select an option

  • Save rksk/4dfec7d3e176f0dbd86f7087b8968d0b to your computer and use it in GitHub Desktop.

Select an option

Save rksk/4dfec7d3e176f0dbd86f7087b8968d0b to your computer and use it in GitHub Desktop.
Asgardeo/WSO2IS App Native Login Client (generated with CursorAI)
import requests
import json
import getpass
from typing import Dict, Any, Optional
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# IdP Configuration
# 1. Create an Standard-Based OIDC Application on Asgardeo or WSO2 IS 7.x
# 2. In the Protocol tab, enable code grant, add a dummy url to Authorized redirect URLs and enable the public client mode
# 3. Enable App-Native Authentication in the Advanced tab
# 4. Finally, fill the following config params accordingly
ASGARDEO_CONFIG = {
'HOSTNAME': 'api.asgardeo.io', # Asgardeo server hostname
'TENANT_DOMAIN': '', # Tenant domain
'CLIENT_ID': '', # OAuth client ID
'REDIRECT_URI': '', # Redirect URI
'SCOPE': 'openid profile email' # Optional scope
}
class AsgardeoClient:
def __init__(self, hostname: str, tenant_domain: str, client_id: str):
"""
Initialize the Asgardeo client.
Args:
hostname: The Asgardeo server hostname (e.g., 'localhost:9443')
tenant_domain: The tenant domain (e.g., 'carbon.super')
client_id: The OAuth client ID
"""
self.base_url = f"https://{hostname}/t/{tenant_domain}/oauth2"
self.client_id = client_id
self.session = requests.Session()
self.flow_id = None
# Add logging interceptor to session
self.session.hooks['response'].append(self._log_response)
def _log_request(self, method: str, url: str, **kwargs) -> None:
"""Log the request details."""
logging.info(f"\n{'='*50}")
logging.info(f"REQUEST: {method} {url}")
if 'params' in kwargs:
logging.info(f"Query Parameters: {json.dumps(kwargs['params'], indent=2)}")
if 'json' in kwargs:
logging.info(f"Request Body: {json.dumps(kwargs['json'], indent=2)}")
logging.info(f"{'='*50}\n")
def _log_response(self, response: requests.Response, *args, **kwargs) -> None:
"""Log the response details."""
logging.info(f"\n{'='*50}")
logging.info(f"RESPONSE: {response.status_code} {response.reason}")
try:
logging.info(f"Response Body: {json.dumps(response.json(), indent=2)}")
except json.JSONDecodeError:
logging.info(f"Response Body: {response.text}")
logging.info(f"{'='*50}\n")
def initiate_auth(self, redirect_uri: str, scope: Optional[str] = None) -> Dict[str, Any]:
"""
Initiate the authentication flow.
Args:
redirect_uri: The redirect URI registered with the application
scope: Optional scope string
Returns:
The initial authentication response
"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'response_mode': 'direct',
'redirect_uri': redirect_uri
}
if scope:
params['scope'] = scope
url = f"{self.base_url}/authorize"
self._log_request('GET', url, params=params)
response = self.session.get(url, params=params)
response.raise_for_status()
auth_response = response.json()
self.flow_id = auth_response.get('flowId')
return auth_response
def handle_authentication(self, auth_request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the authentication request.
Args:
auth_request: The authentication request body
Returns:
The authentication response
"""
url = f"{self.base_url}/authn"
self._log_request('POST', url, json=auth_request)
response = self.session.post(url, json=auth_request)
response.raise_for_status()
return response.json()
def get_user_input(self, param: Dict[str, Any]) -> str:
"""
Get user input for a parameter, handling confidential inputs appropriately.
Args:
param: Parameter metadata from the API
Returns:
The user's input value
"""
param_name = param.get('displayName', param.get('param', ''))
is_confidential = param.get('confidential', False)
prompt = f"Enter {param_name}: "
if is_confidential:
return getpass.getpass(prompt)
return input(prompt)
def handle_authenticator_prompt(self, authenticator: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle authenticator prompt by collecting required parameters from the user.
Args:
authenticator: Authenticator metadata from the API
Returns:
Authentication request with user inputs
"""
params = {}
for param in authenticator.get('requiredParams', []):
param_metadata = next(
(p for p in authenticator.get('metadata', {}).get('params', [])
if p.get('param') == param),
{'param': param}
)
params[param] = self.get_user_input(param_metadata)
return {
'flowId': self.flow_id,
'selectedAuthenticator': {
'authenticatorId': authenticator['authenticatorId'],
'params': params
}
}
def authenticate(self, redirect_uri: str, scope: Optional[str] = None) -> Dict[str, Any]:
"""
Perform the complete authentication flow.
Args:
redirect_uri: The redirect URI registered with the application
scope: Optional scope string
Returns:
The final authentication response
"""
# Step 1: Initiate authentication
logging.info("Initiating authentication...")
auth_response = self.initiate_auth(redirect_uri, scope)
while True:
flow_status = auth_response.get('flowStatus')
if flow_status == 'SUCCESS_COMPLETED':
logging.info("Authentication successful!")
return auth_response
if flow_status == 'FAIL_INCOMPLETE':
# Get error messages from nextStep if available
next_step = auth_response.get('nextStep', {})
messages = next_step.get('messages', [])
for message in messages:
if message.get('type') == 'ERROR':
logging.error(f"Authentication error: {message.get('message')}")
# Continue with the flow instead of returning
logging.info("Continuing with authentication flow...")
# Handle next step
next_step = auth_response.get('nextStep', {})
step_type = next_step.get('stepType')
if step_type == 'AUTHENTICATOR_PROMPT':
authenticators = next_step.get('authenticators', [])
if not authenticators:
logging.info("No authenticators available!")
return auth_response
# For simplicity, use the first authenticator
authenticator = authenticators[0]
logging.info(f"\nAuthenticating with: {authenticator.get('authenticator')}")
# Get user input for required parameters
auth_request = self.handle_authenticator_prompt(authenticator)
# Submit authentication request
auth_response = self.handle_authentication(auth_request)
elif step_type == 'MULTI_OPTIONS_PROMPT':
authenticators = next_step.get('authenticators', [])
if not authenticators:
logging.info("No authenticators available!")
return auth_response
# Display available options
logging.info("\nAvailable authentication options:")
for i, auth in enumerate(authenticators, 1):
logging.info(f"{i}. {auth.get('authenticator')}")
# Get user selection
while True:
try:
selection = int(input("\nSelect an option (enter the number): "))
if 1 <= selection <= len(authenticators):
break
logging.error("Invalid selection. Please try again.")
except ValueError:
logging.error("Please enter a valid number.")
# Get the selected authenticator
selected_authenticator = authenticators[selection - 1]
logging.info(f"\nSelected: {selected_authenticator.get('authenticator')}")
# Check if there are required parameters
required_params = selected_authenticator.get('requiredParams', [])
if required_params:
# Get user input for required parameters
auth_request = self.handle_authenticator_prompt(selected_authenticator)
else:
# If no required parameters, just submit the selection
auth_request = {
'flowId': self.flow_id,
'selectedAuthenticator': {
'authenticatorId': selected_authenticator['authenticatorId']
}
}
# Submit authentication request
auth_response = self.handle_authentication(auth_request)
else:
logging.info(f"Unsupported step type: {step_type}")
return auth_response
def main():
# Get configuration from constants
logging.info("Asgardeo Authentication Client")
logging.info("-----------------------------")
logging.info(f"Using configuration:")
logging.info(f"Hostname: {ASGARDEO_CONFIG['HOSTNAME']}")
logging.info(f"Tenant Domain: {ASGARDEO_CONFIG['TENANT_DOMAIN']}")
logging.info(f"Client ID: {ASGARDEO_CONFIG['CLIENT_ID']}")
logging.info(f"Redirect URI: {ASGARDEO_CONFIG['REDIRECT_URI']}")
logging.info(f"Scope: {ASGARDEO_CONFIG['SCOPE']}")
logging.info("-----------------------------")
# Create client and authenticate
client = AsgardeoClient(
ASGARDEO_CONFIG['HOSTNAME'],
ASGARDEO_CONFIG['TENANT_DOMAIN'],
ASGARDEO_CONFIG['CLIENT_ID']
)
try:
result = client.authenticate(
ASGARDEO_CONFIG['REDIRECT_URI'],
ASGARDEO_CONFIG['SCOPE']
)
if result.get('flowStatus') == 'SUCCESS_COMPLETED':
logging.info("\nAuthentication successful!")
logging.info(f"Authorization code: {result.get('authData', {}).get('code')}")
else:
logging.error("\nAuthentication failed!")
logging.error(f"Error details: {json.dumps(result, indent=2)}")
except requests.exceptions.RequestException as e:
logging.error(f"\nError during authentication: {str(e)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment