Created
January 4, 2026 01:16
-
-
Save uratmangun/f55337b0d77faba7d38241a1c6fe896b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| # Kiro OpenAI Gateway | |
| # https://github.com/jwadow/kiro-openai-gateway | |
| # Copyright (C) 2025 Jwadow | |
| # | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU Affero General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU Affero General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU Affero General Public License | |
| # along with this program. If not, see <https://www.gnu.org/licenses/>. | |
| import json | |
| import os | |
| import sqlite3 | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| from enum import Enum | |
| import requests | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| # --- Load environment variables --- | |
| load_dotenv() | |
| class AuthType(Enum): | |
| """Type of authentication mechanism.""" | |
| KIRO_DESKTOP = "kiro_desktop" | |
| AWS_SSO_OIDC = "aws_sso_oidc" | |
| # --- Configuration --- | |
| KIRO_REGION = os.getenv("KIRO_REGION", "us-east-1") | |
| KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com" | |
| KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken" | |
| AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token" | |
| REFRESH_TOKEN = os.getenv("REFRESH_TOKEN") | |
| PROFILE_ARN = os.getenv("PROFILE_ARN", "arn:aws:codewhisperer:us-east-1:699475941385:profile/EHGA3GRVQMUK") | |
| KIRO_CREDS_FILE = os.getenv("KIRO_CREDS_FILE", "") | |
| KIRO_CLI_DB_FILE = os.getenv("KIRO_CLI_DB_FILE", "") | |
| # AWS SSO OIDC specific credentials | |
| CLIENT_ID = None | |
| CLIENT_SECRET = None | |
| SCOPES = None | |
| AUTH_TOKEN = None | |
| AUTH_TYPE = AuthType.KIRO_DESKTOP | |
| def load_credentials_from_json(file_path: str) -> bool: | |
| """Load credentials from JSON file.""" | |
| global REFRESH_TOKEN, PROFILE_ARN, CLIENT_ID, CLIENT_SECRET, AUTH_TYPE, KIRO_REGION | |
| global KIRO_API_HOST, KIRO_DESKTOP_TOKEN_URL, AWS_SSO_OIDC_TOKEN_URL | |
| try: | |
| creds_path = Path(file_path).expanduser() | |
| if not creds_path.exists(): | |
| logger.warning(f"Credentials file not found: {file_path}") | |
| return False | |
| with open(creds_path, 'r', encoding='utf-8') as f: | |
| creds_data = json.load(f) | |
| # Load common fields | |
| if 'refreshToken' in creds_data: | |
| REFRESH_TOKEN = creds_data['refreshToken'] | |
| if 'profileArn' in creds_data: | |
| PROFILE_ARN = creds_data['profileArn'] | |
| if 'region' in creds_data: | |
| KIRO_REGION = creds_data['region'] | |
| KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com" | |
| KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken" | |
| AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token" | |
| # Load AWS SSO OIDC specific fields | |
| if 'clientId' in creds_data: | |
| CLIENT_ID = creds_data['clientId'] | |
| if 'clientSecret' in creds_data: | |
| CLIENT_SECRET = creds_data['clientSecret'] | |
| # Detect auth type | |
| if CLIENT_ID and CLIENT_SECRET: | |
| AUTH_TYPE = AuthType.AWS_SSO_OIDC | |
| logger.info(f"Detected auth type: AWS SSO OIDC") | |
| else: | |
| AUTH_TYPE = AuthType.KIRO_DESKTOP | |
| logger.info(f"Detected auth type: Kiro Desktop") | |
| logger.info(f"Credentials loaded from {file_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error loading credentials from file: {e}") | |
| return False | |
| def load_credentials_from_sqlite(db_path: str) -> bool: | |
| """Load credentials from kiro-cli SQLite database.""" | |
| global REFRESH_TOKEN, CLIENT_ID, CLIENT_SECRET, AUTH_TYPE, KIRO_REGION, SCOPES, AUTH_TOKEN | |
| global KIRO_API_HOST, KIRO_DESKTOP_TOKEN_URL, AWS_SSO_OIDC_TOKEN_URL | |
| try: | |
| path = Path(db_path).expanduser() | |
| if not path.exists(): | |
| logger.warning(f"SQLite database not found: {db_path}") | |
| return False | |
| conn = sqlite3.connect(str(path)) | |
| cursor = conn.cursor() | |
| # Load token data (try both kiro-cli and codewhisperer key formats) | |
| cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("kirocli:odic:token",)) | |
| token_row = cursor.fetchone() | |
| if not token_row: | |
| cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("codewhisperer:odic:token",)) | |
| token_row = cursor.fetchone() | |
| if token_row: | |
| token_data = json.loads(token_row[0]) | |
| if token_data: | |
| # Check if we have a valid access token | |
| if 'access_token' in token_data and 'expires_at' in token_data: | |
| from datetime import datetime | |
| expires_at = datetime.fromisoformat(token_data['expires_at'].replace('Z', '+00:00')) | |
| if expires_at > datetime.now().astimezone(): | |
| AUTH_TOKEN = token_data['access_token'] | |
| logger.info("Found valid access token in database (will use after HEADERS init)") | |
| if 'refresh_token' in token_data: | |
| REFRESH_TOKEN = token_data['refresh_token'] | |
| if 'scopes' in token_data: | |
| SCOPES = token_data['scopes'] | |
| if 'region' in token_data: | |
| KIRO_REGION = token_data['region'] | |
| KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com" | |
| KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken" | |
| AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token" | |
| # Load device registration (client_id, client_secret) - try both key formats | |
| cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("kirocli:odic:device-registration",)) | |
| registration_row = cursor.fetchone() | |
| if not registration_row: | |
| cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("codewhisperer:odic:device-registration",)) | |
| registration_row = cursor.fetchone() | |
| if registration_row: | |
| registration_data = json.loads(registration_row[0]) | |
| if registration_data: | |
| if 'client_id' in registration_data: | |
| CLIENT_ID = registration_data['client_id'] | |
| if 'client_secret' in registration_data: | |
| CLIENT_SECRET = registration_data['client_secret'] | |
| conn.close() | |
| # Detect auth type | |
| if CLIENT_ID and CLIENT_SECRET: | |
| AUTH_TYPE = AuthType.AWS_SSO_OIDC | |
| logger.info(f"Detected auth type: AWS SSO OIDC (from SQLite)") | |
| else: | |
| AUTH_TYPE = AuthType.KIRO_DESKTOP | |
| logger.info(f"Detected auth type: Kiro Desktop (from SQLite)") | |
| logger.info(f"Credentials loaded from SQLite: {db_path}") | |
| return True | |
| except sqlite3.Error as e: | |
| logger.error(f"SQLite error: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error loading credentials from SQLite: {e}") | |
| return False | |
| # --- Load credentials (priority: SQLite > JSON > env) --- | |
| cred_source = "REFRESH_TOKEN" | |
| if KIRO_CLI_DB_FILE: | |
| if load_credentials_from_sqlite(KIRO_CLI_DB_FILE): | |
| cred_source = "KIRO_CLI_DB_FILE (SQLite)" | |
| elif KIRO_CREDS_FILE: | |
| if load_credentials_from_json(KIRO_CREDS_FILE): | |
| cred_source = "KIRO_CREDS_FILE (JSON)" | |
| # --- Validate required credentials --- | |
| if not REFRESH_TOKEN: | |
| logger.error("No credentials configured. Set REFRESH_TOKEN, KIRO_CREDS_FILE, or KIRO_CLI_DB_FILE. Exiting.") | |
| sys.exit(1) | |
| # Additional validation for AWS SSO OIDC | |
| if AUTH_TYPE == AuthType.AWS_SSO_OIDC and (not CLIENT_ID or not CLIENT_SECRET): | |
| logger.error("AWS SSO OIDC requires clientId and clientSecret. Exiting.") | |
| sys.exit(1) | |
| # Global variables | |
| HEADERS = { | |
| "Authorization": None, | |
| "Content-Type": "application/json", | |
| "User-Agent": "aws-sdk-js/1.0.27 ua/2.1 os/win32#10.0.19044 lang/js md/nodejs#22.21.1 api/codewhispererstreaming#1.0.27 m/E KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64", | |
| "x-amz-user-agent": "aws-sdk-js/1.0.27 KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64", | |
| "x-amzn-codewhisperer-optout": "true", | |
| "x-amzn-kiro-agent-mode": "vibe", | |
| } | |
| def refresh_auth_token(): | |
| """Refreshes AUTH_TOKEN via appropriate endpoint based on auth type.""" | |
| global AUTH_TOKEN, HEADERS | |
| if AUTH_TYPE == AuthType.AWS_SSO_OIDC: | |
| return refresh_auth_token_aws_sso_oidc() | |
| else: | |
| return refresh_auth_token_kiro_desktop() | |
| def refresh_auth_token_kiro_desktop(): | |
| """Refreshes AUTH_TOKEN via Kiro Desktop Auth endpoint.""" | |
| global AUTH_TOKEN, HEADERS | |
| logger.info("Refreshing Kiro token via Kiro Desktop Auth...") | |
| payload = {"refreshToken": REFRESH_TOKEN} | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64", | |
| } | |
| try: | |
| response = requests.post(KIRO_DESKTOP_TOKEN_URL, json=payload, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| new_token = data.get("accessToken") | |
| expires_in = data.get("expiresIn") | |
| if not new_token: | |
| logger.error("Failed to get accessToken from response") | |
| return False | |
| logger.success(f"Token refreshed via Kiro Desktop Auth. Expires in: {expires_in}s") | |
| AUTH_TOKEN = new_token | |
| HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}" | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Error refreshing token via Kiro Desktop Auth: {e}") | |
| if hasattr(e, 'response') and e.response: | |
| logger.error(f"Server response: {e.response.status_code} {e.response.text}") | |
| return False | |
| def refresh_auth_token_aws_sso_oidc(): | |
| """Refreshes AUTH_TOKEN via AWS SSO OIDC endpoint.""" | |
| global AUTH_TOKEN, HEADERS | |
| logger.info("Refreshing Kiro token via AWS SSO OIDC...") | |
| # AWS SSO OIDC uses form-urlencoded data | |
| data = { | |
| "grant_type": "refresh_token", | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "refresh_token": REFRESH_TOKEN, | |
| } | |
| # Add scopes if available | |
| if SCOPES: | |
| data["scope"] = " ".join(SCOPES) | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| try: | |
| response = requests.post(AWS_SSO_OIDC_TOKEN_URL, data=data, headers=headers) | |
| response.raise_for_status() | |
| result = response.json() | |
| new_token = result.get("accessToken") | |
| expires_in = result.get("expiresIn", 3600) | |
| if not new_token: | |
| logger.error(f"Failed to get accessToken from AWS SSO OIDC response: {result}") | |
| return False | |
| logger.success(f"Token refreshed via AWS SSO OIDC. Expires in: {expires_in}s") | |
| AUTH_TOKEN = new_token | |
| HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}" | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Error refreshing token via AWS SSO OIDC: {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logger.error(f"Server response: {e.response.status_code} {e.response.text}") | |
| return False | |
| def get_profile_arn(): | |
| """Gets the profile ARN from ListAvailableProfiles endpoint.""" | |
| global PROFILE_ARN | |
| logger.info("Getting profile ARN from /ListAvailableProfiles...") | |
| url = f"{KIRO_API_HOST}/ListAvailableProfiles" | |
| try: | |
| response = requests.get(url, headers=HEADERS) | |
| response.raise_for_status() | |
| data = response.json() | |
| profiles = data.get("profiles", []) | |
| if profiles: | |
| # Use the first available profile | |
| PROFILE_ARN = profiles[0].get("arn") | |
| logger.info(f"Found profile ARN: {PROFILE_ARN}") | |
| return True | |
| else: | |
| logger.warning("No profiles found") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"ListAvailableProfiles failed: {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logger.error(f"Server response: {e.response.status_code} {e.response.text}") | |
| return False | |
| def test_get_models(): | |
| """Tests the ListAvailableModels endpoint.""" | |
| logger.info("Testing /ListAvailableModels...") | |
| url = f"{KIRO_API_HOST}/ListAvailableModels" | |
| params = { | |
| "origin": "AI_EDITOR", | |
| "profileArn": PROFILE_ARN | |
| } | |
| try: | |
| response = requests.get(url, headers=HEADERS, params=params) | |
| response.raise_for_status() | |
| logger.info(f"Response status: {response.status_code}") | |
| logger.debug(f"Response (JSON):\n{json.dumps(response.json(), indent=2, ensure_ascii=False)}") | |
| logger.success("ListAvailableModels test COMPLETED SUCCESSFULLY") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"ListAvailableModels test failed: {e}") | |
| return False | |
| def test_generate_content(): | |
| """Tests the generateAssistantResponse endpoint.""" | |
| logger.info("Testing /generateAssistantResponse...") | |
| url = f"{KIRO_API_HOST}/generateAssistantResponse" | |
| payload = { | |
| "conversationState": { | |
| "agentContinuationId": str(uuid.uuid4()), | |
| "agentTaskType": "vibe", | |
| "chatTriggerType": "MANUAL", | |
| "conversationId": str(uuid.uuid4()), | |
| "currentMessage": { | |
| "userInputMessage": { | |
| "content": "Hello! Say something short.", | |
| "modelId": "claude-haiku-4.5", | |
| "origin": "AI_EDITOR", | |
| "userInputMessageContext": { | |
| "tools": [] | |
| } | |
| } | |
| }, | |
| "history": [] | |
| } | |
| } | |
| # Only add profileArn if it's set and not the default | |
| if PROFILE_ARN and AUTH_TYPE != AuthType.AWS_SSO_OIDC: | |
| payload["profileArn"] = PROFILE_ARN | |
| try: | |
| with requests.post(url, headers=HEADERS, json=payload, stream=True) as response: | |
| response.raise_for_status() | |
| logger.info(f"Response status: {response.status_code}") | |
| logger.info("Streaming response:") | |
| for chunk in response.iter_content(chunk_size=1024): | |
| if chunk: | |
| # Try to decode and find JSON | |
| chunk_str = chunk.decode('utf-8', errors='ignore') | |
| logger.debug(f"Chunk: {chunk_str[:200]}...") | |
| logger.success("generateAssistantResponse test COMPLETED") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"generateAssistantResponse test failed: {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logger.error(f"Server response: {e.response.status_code} {e.response.text}") | |
| return False | |
| if __name__ == "__main__": | |
| logger.info(f"Starting Kiro API tests...") | |
| logger.info(f" Credentials source: {cred_source}") | |
| logger.info(f" Auth type: {AUTH_TYPE.value}") | |
| logger.info(f" Region: {KIRO_REGION}") | |
| logger.info(f" API Host: {KIRO_API_HOST}") | |
| # Check if we already have a valid token from the database | |
| if AUTH_TOKEN: | |
| HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}" | |
| logger.info("Using existing valid access token from database") | |
| token_ok = True | |
| else: | |
| token_ok = refresh_auth_token() | |
| if token_ok: | |
| # Get profile ARN dynamically for AWS SSO OIDC users | |
| if AUTH_TYPE == AuthType.AWS_SSO_OIDC: | |
| get_profile_arn() | |
| models_ok = test_get_models() | |
| generate_ok = test_generate_content() | |
| if models_ok and generate_ok: | |
| logger.success(f"All tests passed successfully!") | |
| logger.success(f" Auth type: {AUTH_TYPE.value}") | |
| logger.success(f" Credentials: {cred_source}") | |
| else: | |
| logger.warning(f"One or more tests failed.") | |
| else: | |
| logger.error("Failed to refresh token. Tests not started.") | |
| logger.error(f" Auth type: {AUTH_TYPE.value}") | |
| logger.error(f" Token URL: {AWS_SSO_OIDC_TOKEN_URL if AUTH_TYPE == AuthType.AWS_SSO_OIDC else KIRO_DESKTOP_TOKEN_URL}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment