Last active
March 12, 2026 15:42
-
-
Save erewok/424080030dcc3621e68f34467b6903b0 to your computer and use it in GitHub Desktop.
Mulligan Funding iso-api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| export CLIENT_ID="..." | |
| export CLIENT_SECRET="..." | |
| # NOTE: I am using our INTERNAL dev domain here but you would use sandbox | |
| export DOMAIN=https://partner.dev.mulligancloud.com | |
| export TOKEN_TEST=$(curl -X POST \ | |
| "${DOMAIN}/iso/api/auth/oauth2/token" \ | |
| -H "Authorization: Basic ${CLIENT_ID}:${CLIENT_SECRET}") | |
| echo $TOKEN_TEST | jq .accessToken | |
| # -> j.w.t |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Shared HTTP client for ISO API operations. | |
| This module provides a unified HTTP client with: | |
| - Authentication handling | |
| - Common CRUD operations for ISO API endpoints | |
| - Rich console logging for requests/responses | |
| - Error handling and retry logic | |
| """ | |
| import datetime | |
| import json | |
| import os | |
| import typing | |
| from pathlib import Path | |
| import requests | |
| from .harness import log_error, log_request, log_success, log_warning | |
| class APIClient: | |
| """HTTP client for ISO API operations""" | |
| def __init__( | |
| self, domain: str, client_id: str, client_secret: str, use_new_auth_endpoint: bool = False, sfid: str = "" | |
| ): | |
| self.domain = domain | |
| self.client_id = client_id | |
| self.client_secret = client_secret | |
| self.sfid = sfid | |
| self.access_token = "" | |
| self.refresh_token = "" | |
| self.use_new_auth_endpoint = use_new_auth_endpoint | |
| if use_new_auth_endpoint: | |
| # Make sure user is aware of this choice | |
| log_warning("This client is using the new authentication endpoints!", "Path prefix: /iso/api/auth/...") | |
| else: | |
| # Make sure user is aware of this choice | |
| log_warning("This client is using legacy authentication endpoints!", "Path prefix : /auth/...") | |
| self.session = requests.Session() | |
| def authenticate(self) -> bool: | |
| """Authenticate and store access token""" | |
| path = "/auth/oauth2/token" | |
| if self.use_new_auth_endpoint: | |
| path = "/iso/api/auth/oauth2/token" | |
| auth_url = f"{self.domain}{path}" | |
| log_request("POST", path) | |
| response = self.session.post( | |
| auth_url, | |
| headers={"Authorization": f"Basic {self.client_id}:{self.client_secret}"}, | |
| ) | |
| log_request("POST", path, response.status_code) | |
| if response.status_code != 200: | |
| log_error("Authentication", response.status_code, response.text) | |
| return False | |
| token_data = response.json() | |
| self.access_token = token_data["accessToken"] | |
| self.refresh_token = token_data["refreshToken"] | |
| # Set authorization header for future requests | |
| self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) | |
| log_success("Authorization", self.access_token) | |
| log_success("Refresh Token", self.refresh_token) | |
| return True | |
| def get(self, endpoint: str) -> dict | None: | |
| """GET request with logging""" | |
| url = f"{self.domain}{endpoint}" | |
| log_request("GET", endpoint) | |
| response = self.session.get(url) | |
| log_request("GET", endpoint, response.status_code) | |
| if response.status_code == 200: | |
| try: | |
| if response.text.strip(): # Check if response has content | |
| return response.json() | |
| else: | |
| return {} # Return empty dict for empty responses | |
| except json.JSONDecodeError: | |
| log_error(f"GET {endpoint}", response.status_code, f"Invalid JSON: {response.text}") | |
| return None | |
| else: | |
| log_error(f"GET {endpoint}", response.status_code, response.text) | |
| return None | |
| def post(self, endpoint: str, data: dict | None = None, files: dict | None = None) -> dict | None: | |
| """POST request with logging""" | |
| url = f"{self.domain}{endpoint}" | |
| log_request("POST", endpoint) | |
| if files: | |
| # For multipart uploads, don't set Content-Type (requests will handle it) | |
| response = self.session.post(url, data=data, files=files) | |
| else: | |
| response = self.session.post(url, json=data) | |
| log_request("POST", endpoint, response.status_code) | |
| if 200 <= response.status_code < 300: | |
| try: | |
| return response.json() | |
| except json.JSONDecodeError: | |
| # Some endpoints return empty responses | |
| return {} | |
| else: | |
| log_error(f"POST {endpoint}", response.status_code, response.text) | |
| return None | |
| def put(self, endpoint: str, data: dict) -> dict | None: | |
| """PUT request with logging""" | |
| url = f"{self.domain}{endpoint}" | |
| log_request("PUT", endpoint) | |
| response = self.session.put(url, json=data) | |
| log_request("PUT", endpoint, response.status_code) | |
| if 200 <= response.status_code < 300: | |
| try: | |
| return response.json() | |
| except json.JSONDecodeError: | |
| return {} | |
| else: | |
| log_error(f"PUT {endpoint}", response.status_code, response.text) | |
| return None | |
| def create_application(self, app_data: dict) -> str | None: | |
| """Create application and return ID""" | |
| # Update signed date to avoid "must be signed within 30 days" error | |
| signed_date = datetime.date.today() - datetime.timedelta(days=7) | |
| app_data["application"]["application_signed_date"] = signed_date.strftime("%Y-%m-%d") | |
| response_data = self.post("/iso/api/v1/application", app_data) | |
| if response_data: | |
| app_id = response_data.get("application_id") | |
| log_success("Application Creation", f"Created application with ID: {app_id}") | |
| return app_id | |
| return None | |
| def get_application(self, app_id: str) -> dict | None: | |
| """Get application by ID""" | |
| return self.get(f"/iso/api/v1/application/{app_id}") | |
| def update_application(self, app_id: str, app_data: dict) -> dict | None: | |
| """Update application""" | |
| return self.put(f"/iso/api/v1/application/{app_id}", app_data) | |
| def upload_document( | |
| self, | |
| app_id: str, | |
| document_path: str, | |
| ) -> str | None: | |
| """Upload document and return document ID""" | |
| if not os.path.exists(document_path): | |
| log_error("Document Upload", 404, f"File not found: {document_path}") | |
| return None | |
| # Prepare file for upload using original API format | |
| filename = Path(document_path).name | |
| with open(document_path, "rb") as f: | |
| files = {"doc": (filename, f)} | |
| data = {"type": "bank_statement", "application_id": app_id} | |
| response_data = self.post("/iso/api/v1/document", data=data, files=files) | |
| if response_data: | |
| doc_id = response_data.get("document_id") | |
| log_success("Document Upload", f"Uploaded document with ID: {doc_id}") | |
| return doc_id | |
| return None | |
| def get_document(self, doc_id: str) -> dict | None: | |
| """Get document by ID""" | |
| return self.get(f"/iso/api/v1/document/{doc_id}") | |
| def update_document(self, doc_id: str, doc_data: dict) -> dict | None: | |
| """Update document""" | |
| return self.put(f"/iso/api/v1/document/{doc_id}", doc_data) | |
| def get_offer(self, offer_id: str) -> dict | None: | |
| """Get offer by ID""" | |
| return self.get(f"/iso/api/v1/offers/{offer_id}") | |
| def update_offer(self, offer_id: str, offer_data: dict) -> dict | None: | |
| """Update offer""" | |
| return self.put(f"/iso/api/v1/offers/{offer_id}", offer_data) | |
| def create_webhook(self, webhook_data: dict | None = None) -> str | None: | |
| """Create webhook and return ID""" | |
| if webhook_data is None: | |
| webhook_data = {"webhook_url": "https://httpbin.org/post", "event_type": "all"} | |
| response_data = self.post("/iso/api/v1/webhooks", webhook_data) | |
| if response_data: | |
| webhook_id = response_data.get("uuid") | |
| log_success("Webhook Creation", f"Created webhook with ID: {webhook_id}") | |
| return webhook_id | |
| return None | |
| def get_webhook(self, webhook_id: str) -> typing.Optional[dict]: | |
| """Get webhook by ID""" | |
| return self.get(f"/iso/api/v1/webhooks/{webhook_id}") | |
| def update_webhook(self, webhook_id: str, webhook_data: dict) -> typing.Optional[dict]: | |
| """Update webhook""" | |
| return self.put(f"/iso/api/v1/webhooks/{webhook_id}", webhook_data) | |
| def delete_webhook(self, webhook_id: str) -> bool: | |
| """Delete webhook""" | |
| return self.delete(f"/iso/api/v1/webhooks/{webhook_id}") | |
| def test_webhook(self, webhook_id: str) -> bool: | |
| """Test webhook firing""" | |
| # Test endpoint doesn't need webhook_id in URL | |
| response_data = self.post("/iso/api/v1/webhooks/test", data={"webhook_uuid": webhook_id}) | |
| if response_data is not None: # Could be empty dict for successful 204 | |
| log_success("Webhook Test", "Test webhook fired successfully") | |
| return True | |
| return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Shared test harness for ISO API smoketests. | |
| This module provides common functionality for running smoketests including: | |
| - CLI argument parsing | |
| - Environment setup | |
| - Results tracking and rich display | |
| - Authentication handling | |
| """ | |
| import argparse | |
| import datetime | |
| import json | |
| import os | |
| import sys | |
| import typing | |
| from rich import box | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| # Rich console setup | |
| console = Console() | |
| # Constants | |
| SANDBOX_DOMAIN = "https://partner.sandbox.mulliganfunding.com" | |
| DEFAULT_CREDENTIALS_PATH = "./dev_client_id_secret.json" | |
| class TestResult: | |
| """Individual test result""" | |
| def __init__(self, operation: str, success: bool, details: str = ""): | |
| self.operation = operation | |
| self.success = success | |
| self.details = details | |
| class TestHarness: | |
| """Test harness for coordinating smoketests""" | |
| def __init__(self, test_name: str, description: str): | |
| self.test_name = test_name | |
| self.description = description | |
| self.results: list[TestResult] = [] | |
| self.start_time = datetime.datetime.now() | |
| self.domain = "" | |
| self.env_display = "" | |
| def setup_environment(self, env: str) -> str: | |
| """Setup environment and return domain""" | |
| if env == "stg": | |
| self.domain = SANDBOX_DOMAIN | |
| self.env_display = "🏗️ SANDBOX" | |
| return "" | |
| def show_header(self, extra_info: str = ""): | |
| """Display test header""" | |
| header_text = f"[bold]{self.test_name}[/bold]\n" | |
| header_text += f"Environment: {self.env_display}\n" | |
| header_text += f"Domain: [blue]{self.domain}[/blue]" | |
| if extra_info: | |
| header_text += f"\n\n[dim]{extra_info}[/dim]" | |
| console.print(Panel( | |
| header_text, | |
| box=box.DOUBLE, | |
| style="cyan" | |
| )) | |
| def add_result(self, operation: str, success: bool, details: str = ""): | |
| """Add a test result""" | |
| self.results.append(TestResult(operation, success, details)) | |
| def show_step(self, step_num: int, title: str, emoji: str = ""): | |
| """Display step header""" | |
| if emoji: | |
| title = f"{emoji} {title}" | |
| console.print(f"\n[bold yellow]{step_num}. {title}[/bold yellow]") | |
| def has_failures(self) -> bool: | |
| """Check if any tests failed""" | |
| return any(not result.success for result in self.results) | |
| def show_summary(self, categorized: bool = False): | |
| """Display comprehensive summary""" | |
| duration = datetime.datetime.now() - self.start_time | |
| if categorized: | |
| self._show_categorized_summary(duration) | |
| else: | |
| self._show_simple_summary(duration) | |
| def _show_simple_summary(self, duration: datetime.timedelta): | |
| """Show simple summary table""" | |
| table = Table(title=f"{self.test_name} Results Summary", box=box.ROUNDED) | |
| table.add_column("Operation", style="cyan", no_wrap=True) | |
| table.add_column("Status", justify="center") | |
| table.add_column("Details", style="dim") | |
| for result in self.results: | |
| status = "[green]✅ PASS[/green]" if result.success else "[red]❌ FAIL[/red]" | |
| table.add_row(result.operation, status, result.details) | |
| console.print("\n") | |
| console.print(table) | |
| # Final status | |
| if self.has_failures(): | |
| console.print(f"\n[bold red]❌ TEST FAILED[/bold red] - Duration: {duration.total_seconds():.1f}s") | |
| console.print("[red]One or more operations failed. Check the details above.[/red]") | |
| else: | |
| console.print(f"\n[bold green]✅ ALL TESTS PASSED[/bold green] - Duration: {duration.total_seconds():.1f}s") | |
| console.print("[green]All endpoints are working correctly![/green]") | |
| def _show_categorized_summary(self, duration: datetime.timedelta): | |
| """Show categorized summary with statistics""" | |
| # Group results by category | |
| categories = { | |
| "Authentication": [], | |
| "Application": [], | |
| "Document": [], | |
| "Offer": [], | |
| "Webhook": [], | |
| "Cleanup": [] | |
| } | |
| for result in self.results: | |
| if "Application" in result.operation: | |
| categories["Application"].append(result) | |
| elif "Document" in result.operation: | |
| categories["Document"].append(result) | |
| elif "Offer" in result.operation: | |
| categories["Offer"].append(result) | |
| elif "Webhook" in result.operation: | |
| categories["Webhook"].append(result) | |
| elif "Authentication" in result.operation: | |
| categories["Authentication"].append(result) | |
| elif "Cleanup" in result.operation: | |
| categories["Cleanup"].append(result) | |
| table = Table(title=f"{self.test_name} Results Summary", box=box.ROUNDED) | |
| table.add_column("Category", style="bold cyan", no_wrap=True) | |
| table.add_column("Operation", style="cyan") | |
| table.add_column("Status", justify="center") | |
| table.add_column("Details", style="dim") | |
| for category, results in categories.items(): | |
| if results: | |
| for i, result in enumerate(results): | |
| status = "[green]✅ PASS[/green]" if result.success else "[red]❌ FAIL[/red]" | |
| cat_display = category if i == 0 else "" | |
| table.add_row(cat_display, result.operation, status, result.details) | |
| console.print("\n") | |
| console.print(table) | |
| # Statistics | |
| total_tests = len(self.results) | |
| passed_tests = sum(1 for result in self.results if result.success) | |
| failed_tests = total_tests - passed_tests | |
| stats_table = Table(box=box.SIMPLE) | |
| stats_table.add_column("Metric", style="bold") | |
| stats_table.add_column("Count", justify="right") | |
| stats_table.add_row("Total Tests", str(total_tests)) | |
| stats_table.add_row("[green]Passed[/green]", f"[green]{passed_tests}[/green]") | |
| if failed_tests > 0: | |
| stats_table.add_row("[red]Failed[/red]", f"[red]{failed_tests}[/red]") | |
| stats_table.add_row("Duration", f"{duration.total_seconds():.1f}s") | |
| console.print(stats_table) | |
| # Final status | |
| if self.has_failures(): | |
| console.print("\n[bold red]❌ COMPREHENSIVE TEST FAILED[/bold red]") | |
| console.print("[red]One or more endpoint tests failed. Check the details above.[/red]") | |
| else: | |
| console.print("\n[bold green]✅ ALL COMPREHENSIVE TESTS PASSED[/bold green]") | |
| console.print("[green]All endpoints are working correctly![/green]") | |
| console.print("[dim]Your API is ready for customer use! 🎆[/dim]") | |
| def exit_on_failure(self): | |
| """Exit with appropriate code based on results""" | |
| if self.has_failures(): | |
| sys.exit(1) | |
| def log_request(method: str, endpoint: str, status_code: typing.Optional[int] = None): | |
| """Log HTTP request with rich formatting""" | |
| if status_code is None: | |
| console.print(f"📤 [bold blue]{method}[/bold blue] {endpoint}", style="dim") | |
| elif 200 <= status_code < 300: | |
| console.print(f"✅ [bold green]{method}[/bold green] {endpoint} → [green]{status_code}[/green]") | |
| else: | |
| console.print(f"❌ [bold red]{method}[/bold red] {endpoint} → [red]{status_code}[/red]") | |
| def log_error(operation: str, status_code: int, response_text: str): | |
| """Log error with highlighted details""" | |
| error_panel = Panel( | |
| f"[red]Status Code:[/red] {status_code}\n[red]Response:[/red] {response_text}", | |
| title=f"[bold red]❌ {operation} Failed[/bold red]", | |
| border_style="red", | |
| expand=False | |
| ) | |
| console.print(error_panel) | |
| def log_success(operation: str, details: str = ""): | |
| """Log success with highlighted details""" | |
| success_text = f"[bold green]✅ {operation} Successful[/bold green]" | |
| if details: | |
| success_text += f"\n[dim]{details}[/dim]" | |
| console.print(success_text) | |
| def log_warning(message: str, details: str = ""): | |
| """Log warning message""" | |
| warn_text = f"[bold yellow]⚠️ WARNING: {message}[/bold yellow]: " | |
| if details: | |
| warn_text += f"\n\t[dim]{details}[/dim]" | |
| console.print(warn_text) | |
| def load_default_credentials(credentials_path: str = DEFAULT_CREDENTIALS_PATH) -> dict[str, str]: | |
| """Load default credentials from fixture file""" | |
| if not os.path.exists(credentials_path): | |
| console.print(f"[bold red]❌ Default credentials file not found:[/bold red] {credentials_path}") | |
| sys.exit(1) | |
| with open(credentials_path, "r") as fl: | |
| creds = json.load(fl) | |
| console.print(f"🔑 Using default credentials from [cyan]{credentials_path}[/cyan]") | |
| return creds | |
| def create_cli_parser( | |
| script_name: str, | |
| description: str, | |
| add_document_args: bool = False | |
| ) -> argparse.ArgumentParser: | |
| """Create standard CLI argument parser for smoketests""" | |
| parser = argparse.ArgumentParser(script_name, description=description) | |
| parser.add_argument( | |
| "--client-id", "-c", | |
| type=str, | |
| default="", | |
| required=False, | |
| help="Client ID (will use fixture if not provided)" | |
| ) | |
| parser.add_argument( | |
| "--client-secret", "-s", | |
| type=str, | |
| default="", | |
| required=False, | |
| help="Client Secret (will use fixture if not provided)" | |
| ) | |
| parser.add_argument("--use-new-auth-endpoint", action="store_true", help="Use the new authentication endpoint") | |
| parser.add_argument( | |
| "--env", | |
| "-e", | |
| type=str, | |
| default="stg", | |
| choices=["dev", "stg"], | |
| help="Environment (dev or stg)", | |
| ) | |
| if add_document_args: | |
| parser.add_argument( | |
| "--application", "-a", | |
| type=str, | |
| default="tests/fixtures/app_payload.json", | |
| help="Application JSON file path" | |
| ) | |
| parser.add_argument( | |
| "--document", "-d", | |
| type=str, | |
| default="tests/fixtures/sample # bank; statement bad chars #~ .pdf", | |
| help="Document file path" | |
| ) | |
| return parser | |
| def resolve_credentials(client_id: str, client_secret: str) -> tuple[str, str, str]: | |
| """Resolve credentials, loading from fixture if not provided""" | |
| if not client_id or not client_secret: | |
| creds = load_default_credentials() | |
| return creds["client_id"], creds["client_secret"], creds["sfId"] | |
| return client_id, client_secret, "" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment