Skip to content

Instantly share code, notes, and snippets.

@erewok
Last active March 12, 2026 15:42
Show Gist options
  • Select an option

  • Save erewok/424080030dcc3621e68f34467b6903b0 to your computer and use it in GitHub Desktop.

Select an option

Save erewok/424080030dcc3621e68f34467b6903b0 to your computer and use it in GitHub Desktop.
Mulligan Funding iso-api
#!/bin/bash
export CLIENT_ID="..."
export CLIENT_SECRET="..."
# NOTE: I am using our INTERNAL dev domain here but you would use sandbox
export DOMAIN=https://partner.dev.mulligancloud.com
export TOKEN_TEST=$(curl -X POST \
"${DOMAIN}/iso/api/auth/oauth2/token" \
-H "Authorization: Basic ${CLIENT_ID}:${CLIENT_SECRET}")
echo $TOKEN_TEST | jq .accessToken
# -> j.w.t
"""
Shared HTTP client for ISO API operations.
This module provides a unified HTTP client with:
- Authentication handling
- Common CRUD operations for ISO API endpoints
- Rich console logging for requests/responses
- Error handling and retry logic
"""
import datetime
import json
import os
import typing
from pathlib import Path
import requests
from .harness import log_error, log_request, log_success, log_warning
class APIClient:
"""HTTP client for ISO API operations"""
def __init__(
self, domain: str, client_id: str, client_secret: str, use_new_auth_endpoint: bool = False, sfid: str = ""
):
self.domain = domain
self.client_id = client_id
self.client_secret = client_secret
self.sfid = sfid
self.access_token = ""
self.refresh_token = ""
self.use_new_auth_endpoint = use_new_auth_endpoint
if use_new_auth_endpoint:
# Make sure user is aware of this choice
log_warning("This client is using the new authentication endpoints!", "Path prefix: /iso/api/auth/...")
else:
# Make sure user is aware of this choice
log_warning("This client is using legacy authentication endpoints!", "Path prefix : /auth/...")
self.session = requests.Session()
def authenticate(self) -> bool:
"""Authenticate and store access token"""
path = "/auth/oauth2/token"
if self.use_new_auth_endpoint:
path = "/iso/api/auth/oauth2/token"
auth_url = f"{self.domain}{path}"
log_request("POST", path)
response = self.session.post(
auth_url,
headers={"Authorization": f"Basic {self.client_id}:{self.client_secret}"},
)
log_request("POST", path, response.status_code)
if response.status_code != 200:
log_error("Authentication", response.status_code, response.text)
return False
token_data = response.json()
self.access_token = token_data["accessToken"]
self.refresh_token = token_data["refreshToken"]
# Set authorization header for future requests
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
log_success("Authorization", self.access_token)
log_success("Refresh Token", self.refresh_token)
return True
def get(self, endpoint: str) -> dict | None:
"""GET request with logging"""
url = f"{self.domain}{endpoint}"
log_request("GET", endpoint)
response = self.session.get(url)
log_request("GET", endpoint, response.status_code)
if response.status_code == 200:
try:
if response.text.strip(): # Check if response has content
return response.json()
else:
return {} # Return empty dict for empty responses
except json.JSONDecodeError:
log_error(f"GET {endpoint}", response.status_code, f"Invalid JSON: {response.text}")
return None
else:
log_error(f"GET {endpoint}", response.status_code, response.text)
return None
def post(self, endpoint: str, data: dict | None = None, files: dict | None = None) -> dict | None:
"""POST request with logging"""
url = f"{self.domain}{endpoint}"
log_request("POST", endpoint)
if files:
# For multipart uploads, don't set Content-Type (requests will handle it)
response = self.session.post(url, data=data, files=files)
else:
response = self.session.post(url, json=data)
log_request("POST", endpoint, response.status_code)
if 200 <= response.status_code < 300:
try:
return response.json()
except json.JSONDecodeError:
# Some endpoints return empty responses
return {}
else:
log_error(f"POST {endpoint}", response.status_code, response.text)
return None
def put(self, endpoint: str, data: dict) -> dict | None:
"""PUT request with logging"""
url = f"{self.domain}{endpoint}"
log_request("PUT", endpoint)
response = self.session.put(url, json=data)
log_request("PUT", endpoint, response.status_code)
if 200 <= response.status_code < 300:
try:
return response.json()
except json.JSONDecodeError:
return {}
else:
log_error(f"PUT {endpoint}", response.status_code, response.text)
return None
def create_application(self, app_data: dict) -> str | None:
"""Create application and return ID"""
# Update signed date to avoid "must be signed within 30 days" error
signed_date = datetime.date.today() - datetime.timedelta(days=7)
app_data["application"]["application_signed_date"] = signed_date.strftime("%Y-%m-%d")
response_data = self.post("/iso/api/v1/application", app_data)
if response_data:
app_id = response_data.get("application_id")
log_success("Application Creation", f"Created application with ID: {app_id}")
return app_id
return None
def get_application(self, app_id: str) -> dict | None:
"""Get application by ID"""
return self.get(f"/iso/api/v1/application/{app_id}")
def update_application(self, app_id: str, app_data: dict) -> dict | None:
"""Update application"""
return self.put(f"/iso/api/v1/application/{app_id}", app_data)
def upload_document(
self,
app_id: str,
document_path: str,
) -> str | None:
"""Upload document and return document ID"""
if not os.path.exists(document_path):
log_error("Document Upload", 404, f"File not found: {document_path}")
return None
# Prepare file for upload using original API format
filename = Path(document_path).name
with open(document_path, "rb") as f:
files = {"doc": (filename, f)}
data = {"type": "bank_statement", "application_id": app_id}
response_data = self.post("/iso/api/v1/document", data=data, files=files)
if response_data:
doc_id = response_data.get("document_id")
log_success("Document Upload", f"Uploaded document with ID: {doc_id}")
return doc_id
return None
def get_document(self, doc_id: str) -> dict | None:
"""Get document by ID"""
return self.get(f"/iso/api/v1/document/{doc_id}")
def update_document(self, doc_id: str, doc_data: dict) -> dict | None:
"""Update document"""
return self.put(f"/iso/api/v1/document/{doc_id}", doc_data)
def get_offer(self, offer_id: str) -> dict | None:
"""Get offer by ID"""
return self.get(f"/iso/api/v1/offers/{offer_id}")
def update_offer(self, offer_id: str, offer_data: dict) -> dict | None:
"""Update offer"""
return self.put(f"/iso/api/v1/offers/{offer_id}", offer_data)
def create_webhook(self, webhook_data: dict | None = None) -> str | None:
"""Create webhook and return ID"""
if webhook_data is None:
webhook_data = {"webhook_url": "https://httpbin.org/post", "event_type": "all"}
response_data = self.post("/iso/api/v1/webhooks", webhook_data)
if response_data:
webhook_id = response_data.get("uuid")
log_success("Webhook Creation", f"Created webhook with ID: {webhook_id}")
return webhook_id
return None
def get_webhook(self, webhook_id: str) -> typing.Optional[dict]:
"""Get webhook by ID"""
return self.get(f"/iso/api/v1/webhooks/{webhook_id}")
def update_webhook(self, webhook_id: str, webhook_data: dict) -> typing.Optional[dict]:
"""Update webhook"""
return self.put(f"/iso/api/v1/webhooks/{webhook_id}", webhook_data)
def delete_webhook(self, webhook_id: str) -> bool:
"""Delete webhook"""
return self.delete(f"/iso/api/v1/webhooks/{webhook_id}")
def test_webhook(self, webhook_id: str) -> bool:
"""Test webhook firing"""
# Test endpoint doesn't need webhook_id in URL
response_data = self.post("/iso/api/v1/webhooks/test", data={"webhook_uuid": webhook_id})
if response_data is not None: # Could be empty dict for successful 204
log_success("Webhook Test", "Test webhook fired successfully")
return True
return False
"""
Shared test harness for ISO API smoketests.
This module provides common functionality for running smoketests including:
- CLI argument parsing
- Environment setup
- Results tracking and rich display
- Authentication handling
"""
import argparse
import datetime
import json
import os
import sys
import typing
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
# Rich console setup
console = Console()
# Constants
SANDBOX_DOMAIN = "https://partner.sandbox.mulliganfunding.com"
DEFAULT_CREDENTIALS_PATH = "./dev_client_id_secret.json"
class TestResult:
"""Individual test result"""
def __init__(self, operation: str, success: bool, details: str = ""):
self.operation = operation
self.success = success
self.details = details
class TestHarness:
"""Test harness for coordinating smoketests"""
def __init__(self, test_name: str, description: str):
self.test_name = test_name
self.description = description
self.results: list[TestResult] = []
self.start_time = datetime.datetime.now()
self.domain = ""
self.env_display = ""
def setup_environment(self, env: str) -> str:
"""Setup environment and return domain"""
if env == "stg":
self.domain = SANDBOX_DOMAIN
self.env_display = "🏗️ SANDBOX"
return ""
def show_header(self, extra_info: str = ""):
"""Display test header"""
header_text = f"[bold]{self.test_name}[/bold]\n"
header_text += f"Environment: {self.env_display}\n"
header_text += f"Domain: [blue]{self.domain}[/blue]"
if extra_info:
header_text += f"\n\n[dim]{extra_info}[/dim]"
console.print(Panel(
header_text,
box=box.DOUBLE,
style="cyan"
))
def add_result(self, operation: str, success: bool, details: str = ""):
"""Add a test result"""
self.results.append(TestResult(operation, success, details))
def show_step(self, step_num: int, title: str, emoji: str = ""):
"""Display step header"""
if emoji:
title = f"{emoji} {title}"
console.print(f"\n[bold yellow]{step_num}. {title}[/bold yellow]")
def has_failures(self) -> bool:
"""Check if any tests failed"""
return any(not result.success for result in self.results)
def show_summary(self, categorized: bool = False):
"""Display comprehensive summary"""
duration = datetime.datetime.now() - self.start_time
if categorized:
self._show_categorized_summary(duration)
else:
self._show_simple_summary(duration)
def _show_simple_summary(self, duration: datetime.timedelta):
"""Show simple summary table"""
table = Table(title=f"{self.test_name} Results Summary", box=box.ROUNDED)
table.add_column("Operation", style="cyan", no_wrap=True)
table.add_column("Status", justify="center")
table.add_column("Details", style="dim")
for result in self.results:
status = "[green]✅ PASS[/green]" if result.success else "[red]❌ FAIL[/red]"
table.add_row(result.operation, status, result.details)
console.print("\n")
console.print(table)
# Final status
if self.has_failures():
console.print(f"\n[bold red]❌ TEST FAILED[/bold red] - Duration: {duration.total_seconds():.1f}s")
console.print("[red]One or more operations failed. Check the details above.[/red]")
else:
console.print(f"\n[bold green]✅ ALL TESTS PASSED[/bold green] - Duration: {duration.total_seconds():.1f}s")
console.print("[green]All endpoints are working correctly![/green]")
def _show_categorized_summary(self, duration: datetime.timedelta):
"""Show categorized summary with statistics"""
# Group results by category
categories = {
"Authentication": [],
"Application": [],
"Document": [],
"Offer": [],
"Webhook": [],
"Cleanup": []
}
for result in self.results:
if "Application" in result.operation:
categories["Application"].append(result)
elif "Document" in result.operation:
categories["Document"].append(result)
elif "Offer" in result.operation:
categories["Offer"].append(result)
elif "Webhook" in result.operation:
categories["Webhook"].append(result)
elif "Authentication" in result.operation:
categories["Authentication"].append(result)
elif "Cleanup" in result.operation:
categories["Cleanup"].append(result)
table = Table(title=f"{self.test_name} Results Summary", box=box.ROUNDED)
table.add_column("Category", style="bold cyan", no_wrap=True)
table.add_column("Operation", style="cyan")
table.add_column("Status", justify="center")
table.add_column("Details", style="dim")
for category, results in categories.items():
if results:
for i, result in enumerate(results):
status = "[green]✅ PASS[/green]" if result.success else "[red]❌ FAIL[/red]"
cat_display = category if i == 0 else ""
table.add_row(cat_display, result.operation, status, result.details)
console.print("\n")
console.print(table)
# Statistics
total_tests = len(self.results)
passed_tests = sum(1 for result in self.results if result.success)
failed_tests = total_tests - passed_tests
stats_table = Table(box=box.SIMPLE)
stats_table.add_column("Metric", style="bold")
stats_table.add_column("Count", justify="right")
stats_table.add_row("Total Tests", str(total_tests))
stats_table.add_row("[green]Passed[/green]", f"[green]{passed_tests}[/green]")
if failed_tests > 0:
stats_table.add_row("[red]Failed[/red]", f"[red]{failed_tests}[/red]")
stats_table.add_row("Duration", f"{duration.total_seconds():.1f}s")
console.print(stats_table)
# Final status
if self.has_failures():
console.print("\n[bold red]❌ COMPREHENSIVE TEST FAILED[/bold red]")
console.print("[red]One or more endpoint tests failed. Check the details above.[/red]")
else:
console.print("\n[bold green]✅ ALL COMPREHENSIVE TESTS PASSED[/bold green]")
console.print("[green]All endpoints are working correctly![/green]")
console.print("[dim]Your API is ready for customer use! 🎆[/dim]")
def exit_on_failure(self):
"""Exit with appropriate code based on results"""
if self.has_failures():
sys.exit(1)
def log_request(method: str, endpoint: str, status_code: typing.Optional[int] = None):
"""Log HTTP request with rich formatting"""
if status_code is None:
console.print(f"📤 [bold blue]{method}[/bold blue] {endpoint}", style="dim")
elif 200 <= status_code < 300:
console.print(f"✅ [bold green]{method}[/bold green] {endpoint} → [green]{status_code}[/green]")
else:
console.print(f"❌ [bold red]{method}[/bold red] {endpoint} → [red]{status_code}[/red]")
def log_error(operation: str, status_code: int, response_text: str):
"""Log error with highlighted details"""
error_panel = Panel(
f"[red]Status Code:[/red] {status_code}\n[red]Response:[/red] {response_text}",
title=f"[bold red]❌ {operation} Failed[/bold red]",
border_style="red",
expand=False
)
console.print(error_panel)
def log_success(operation: str, details: str = ""):
"""Log success with highlighted details"""
success_text = f"[bold green]✅ {operation} Successful[/bold green]"
if details:
success_text += f"\n[dim]{details}[/dim]"
console.print(success_text)
def log_warning(message: str, details: str = ""):
"""Log warning message"""
warn_text = f"[bold yellow]⚠️ WARNING: {message}[/bold yellow]: "
if details:
warn_text += f"\n\t[dim]{details}[/dim]"
console.print(warn_text)
def load_default_credentials(credentials_path: str = DEFAULT_CREDENTIALS_PATH) -> dict[str, str]:
"""Load default credentials from fixture file"""
if not os.path.exists(credentials_path):
console.print(f"[bold red]❌ Default credentials file not found:[/bold red] {credentials_path}")
sys.exit(1)
with open(credentials_path, "r") as fl:
creds = json.load(fl)
console.print(f"🔑 Using default credentials from [cyan]{credentials_path}[/cyan]")
return creds
def create_cli_parser(
script_name: str,
description: str,
add_document_args: bool = False
) -> argparse.ArgumentParser:
"""Create standard CLI argument parser for smoketests"""
parser = argparse.ArgumentParser(script_name, description=description)
parser.add_argument(
"--client-id", "-c",
type=str,
default="",
required=False,
help="Client ID (will use fixture if not provided)"
)
parser.add_argument(
"--client-secret", "-s",
type=str,
default="",
required=False,
help="Client Secret (will use fixture if not provided)"
)
parser.add_argument("--use-new-auth-endpoint", action="store_true", help="Use the new authentication endpoint")
parser.add_argument(
"--env",
"-e",
type=str,
default="stg",
choices=["dev", "stg"],
help="Environment (dev or stg)",
)
if add_document_args:
parser.add_argument(
"--application", "-a",
type=str,
default="tests/fixtures/app_payload.json",
help="Application JSON file path"
)
parser.add_argument(
"--document", "-d",
type=str,
default="tests/fixtures/sample # bank; statement bad chars #~ .pdf",
help="Document file path"
)
return parser
def resolve_credentials(client_id: str, client_secret: str) -> tuple[str, str, str]:
"""Resolve credentials, loading from fixture if not provided"""
if not client_id or not client_secret:
creds = load_default_credentials()
return creds["client_id"], creds["client_secret"], creds["sfId"]
return client_id, client_secret, ""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment