Skip to content

Instantly share code, notes, and snippets.

@uratmangun
Created January 4, 2026 01:16
Show Gist options
  • Select an option

  • Save uratmangun/f55337b0d77faba7d38241a1c6fe896b to your computer and use it in GitHub Desktop.

Select an option

Save uratmangun/f55337b0d77faba7d38241a1c6fe896b to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# Kiro OpenAI Gateway
# https://github.com/jwadow/kiro-openai-gateway
# Copyright (C) 2025 Jwadow
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import sqlite3
import sys
import uuid
from pathlib import Path
from enum import Enum
import requests
from dotenv import load_dotenv
from loguru import logger
# --- Load environment variables ---
load_dotenv()
class AuthType(Enum):
"""Type of authentication mechanism."""
KIRO_DESKTOP = "kiro_desktop"
AWS_SSO_OIDC = "aws_sso_oidc"
# --- Configuration ---
KIRO_REGION = os.getenv("KIRO_REGION", "us-east-1")
KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com"
KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken"
AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token"
REFRESH_TOKEN = os.getenv("REFRESH_TOKEN")
PROFILE_ARN = os.getenv("PROFILE_ARN", "arn:aws:codewhisperer:us-east-1:699475941385:profile/EHGA3GRVQMUK")
KIRO_CREDS_FILE = os.getenv("KIRO_CREDS_FILE", "")
KIRO_CLI_DB_FILE = os.getenv("KIRO_CLI_DB_FILE", "")
# AWS SSO OIDC specific credentials
CLIENT_ID = None
CLIENT_SECRET = None
SCOPES = None
AUTH_TOKEN = None
AUTH_TYPE = AuthType.KIRO_DESKTOP
def load_credentials_from_json(file_path: str) -> bool:
"""Load credentials from JSON file."""
global REFRESH_TOKEN, PROFILE_ARN, CLIENT_ID, CLIENT_SECRET, AUTH_TYPE, KIRO_REGION
global KIRO_API_HOST, KIRO_DESKTOP_TOKEN_URL, AWS_SSO_OIDC_TOKEN_URL
try:
creds_path = Path(file_path).expanduser()
if not creds_path.exists():
logger.warning(f"Credentials file not found: {file_path}")
return False
with open(creds_path, 'r', encoding='utf-8') as f:
creds_data = json.load(f)
# Load common fields
if 'refreshToken' in creds_data:
REFRESH_TOKEN = creds_data['refreshToken']
if 'profileArn' in creds_data:
PROFILE_ARN = creds_data['profileArn']
if 'region' in creds_data:
KIRO_REGION = creds_data['region']
KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com"
KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken"
AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token"
# Load AWS SSO OIDC specific fields
if 'clientId' in creds_data:
CLIENT_ID = creds_data['clientId']
if 'clientSecret' in creds_data:
CLIENT_SECRET = creds_data['clientSecret']
# Detect auth type
if CLIENT_ID and CLIENT_SECRET:
AUTH_TYPE = AuthType.AWS_SSO_OIDC
logger.info(f"Detected auth type: AWS SSO OIDC")
else:
AUTH_TYPE = AuthType.KIRO_DESKTOP
logger.info(f"Detected auth type: Kiro Desktop")
logger.info(f"Credentials loaded from {file_path}")
return True
except Exception as e:
logger.error(f"Error loading credentials from file: {e}")
return False
def load_credentials_from_sqlite(db_path: str) -> bool:
"""Load credentials from kiro-cli SQLite database."""
global REFRESH_TOKEN, CLIENT_ID, CLIENT_SECRET, AUTH_TYPE, KIRO_REGION, SCOPES, AUTH_TOKEN
global KIRO_API_HOST, KIRO_DESKTOP_TOKEN_URL, AWS_SSO_OIDC_TOKEN_URL
try:
path = Path(db_path).expanduser()
if not path.exists():
logger.warning(f"SQLite database not found: {db_path}")
return False
conn = sqlite3.connect(str(path))
cursor = conn.cursor()
# Load token data (try both kiro-cli and codewhisperer key formats)
cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("kirocli:odic:token",))
token_row = cursor.fetchone()
if not token_row:
cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("codewhisperer:odic:token",))
token_row = cursor.fetchone()
if token_row:
token_data = json.loads(token_row[0])
if token_data:
# Check if we have a valid access token
if 'access_token' in token_data and 'expires_at' in token_data:
from datetime import datetime
expires_at = datetime.fromisoformat(token_data['expires_at'].replace('Z', '+00:00'))
if expires_at > datetime.now().astimezone():
AUTH_TOKEN = token_data['access_token']
logger.info("Found valid access token in database (will use after HEADERS init)")
if 'refresh_token' in token_data:
REFRESH_TOKEN = token_data['refresh_token']
if 'scopes' in token_data:
SCOPES = token_data['scopes']
if 'region' in token_data:
KIRO_REGION = token_data['region']
KIRO_API_HOST = f"https://q.{KIRO_REGION}.amazonaws.com"
KIRO_DESKTOP_TOKEN_URL = f"https://prod.{KIRO_REGION}.auth.desktop.kiro.dev/refreshToken"
AWS_SSO_OIDC_TOKEN_URL = f"https://oidc.{KIRO_REGION}.amazonaws.com/token"
# Load device registration (client_id, client_secret) - try both key formats
cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("kirocli:odic:device-registration",))
registration_row = cursor.fetchone()
if not registration_row:
cursor.execute("SELECT value FROM auth_kv WHERE key = ?", ("codewhisperer:odic:device-registration",))
registration_row = cursor.fetchone()
if registration_row:
registration_data = json.loads(registration_row[0])
if registration_data:
if 'client_id' in registration_data:
CLIENT_ID = registration_data['client_id']
if 'client_secret' in registration_data:
CLIENT_SECRET = registration_data['client_secret']
conn.close()
# Detect auth type
if CLIENT_ID and CLIENT_SECRET:
AUTH_TYPE = AuthType.AWS_SSO_OIDC
logger.info(f"Detected auth type: AWS SSO OIDC (from SQLite)")
else:
AUTH_TYPE = AuthType.KIRO_DESKTOP
logger.info(f"Detected auth type: Kiro Desktop (from SQLite)")
logger.info(f"Credentials loaded from SQLite: {db_path}")
return True
except sqlite3.Error as e:
logger.error(f"SQLite error: {e}")
return False
except Exception as e:
logger.error(f"Error loading credentials from SQLite: {e}")
return False
# --- Load credentials (priority: SQLite > JSON > env) ---
cred_source = "REFRESH_TOKEN"
if KIRO_CLI_DB_FILE:
if load_credentials_from_sqlite(KIRO_CLI_DB_FILE):
cred_source = "KIRO_CLI_DB_FILE (SQLite)"
elif KIRO_CREDS_FILE:
if load_credentials_from_json(KIRO_CREDS_FILE):
cred_source = "KIRO_CREDS_FILE (JSON)"
# --- Validate required credentials ---
if not REFRESH_TOKEN:
logger.error("No credentials configured. Set REFRESH_TOKEN, KIRO_CREDS_FILE, or KIRO_CLI_DB_FILE. Exiting.")
sys.exit(1)
# Additional validation for AWS SSO OIDC
if AUTH_TYPE == AuthType.AWS_SSO_OIDC and (not CLIENT_ID or not CLIENT_SECRET):
logger.error("AWS SSO OIDC requires clientId and clientSecret. Exiting.")
sys.exit(1)
# Global variables
HEADERS = {
"Authorization": None,
"Content-Type": "application/json",
"User-Agent": "aws-sdk-js/1.0.27 ua/2.1 os/win32#10.0.19044 lang/js md/nodejs#22.21.1 api/codewhispererstreaming#1.0.27 m/E KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64",
"x-amz-user-agent": "aws-sdk-js/1.0.27 KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64",
"x-amzn-codewhisperer-optout": "true",
"x-amzn-kiro-agent-mode": "vibe",
}
def refresh_auth_token():
"""Refreshes AUTH_TOKEN via appropriate endpoint based on auth type."""
global AUTH_TOKEN, HEADERS
if AUTH_TYPE == AuthType.AWS_SSO_OIDC:
return refresh_auth_token_aws_sso_oidc()
else:
return refresh_auth_token_kiro_desktop()
def refresh_auth_token_kiro_desktop():
"""Refreshes AUTH_TOKEN via Kiro Desktop Auth endpoint."""
global AUTH_TOKEN, HEADERS
logger.info("Refreshing Kiro token via Kiro Desktop Auth...")
payload = {"refreshToken": REFRESH_TOKEN}
headers = {
"Content-Type": "application/json",
"User-Agent": "KiroIDE-0.7.45-31c325a0ff0a9c8dec5d13048f4257462d751fe5b8af4cb1088f1fca45856c64",
}
try:
response = requests.post(KIRO_DESKTOP_TOKEN_URL, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
new_token = data.get("accessToken")
expires_in = data.get("expiresIn")
if not new_token:
logger.error("Failed to get accessToken from response")
return False
logger.success(f"Token refreshed via Kiro Desktop Auth. Expires in: {expires_in}s")
AUTH_TOKEN = new_token
HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}"
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error refreshing token via Kiro Desktop Auth: {e}")
if hasattr(e, 'response') and e.response:
logger.error(f"Server response: {e.response.status_code} {e.response.text}")
return False
def refresh_auth_token_aws_sso_oidc():
"""Refreshes AUTH_TOKEN via AWS SSO OIDC endpoint."""
global AUTH_TOKEN, HEADERS
logger.info("Refreshing Kiro token via AWS SSO OIDC...")
# AWS SSO OIDC uses form-urlencoded data
data = {
"grant_type": "refresh_token",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"refresh_token": REFRESH_TOKEN,
}
# Add scopes if available
if SCOPES:
data["scope"] = " ".join(SCOPES)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
try:
response = requests.post(AWS_SSO_OIDC_TOKEN_URL, data=data, headers=headers)
response.raise_for_status()
result = response.json()
new_token = result.get("accessToken")
expires_in = result.get("expiresIn", 3600)
if not new_token:
logger.error(f"Failed to get accessToken from AWS SSO OIDC response: {result}")
return False
logger.success(f"Token refreshed via AWS SSO OIDC. Expires in: {expires_in}s")
AUTH_TOKEN = new_token
HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}"
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error refreshing token via AWS SSO OIDC: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Server response: {e.response.status_code} {e.response.text}")
return False
def get_profile_arn():
"""Gets the profile ARN from ListAvailableProfiles endpoint."""
global PROFILE_ARN
logger.info("Getting profile ARN from /ListAvailableProfiles...")
url = f"{KIRO_API_HOST}/ListAvailableProfiles"
try:
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
data = response.json()
profiles = data.get("profiles", [])
if profiles:
# Use the first available profile
PROFILE_ARN = profiles[0].get("arn")
logger.info(f"Found profile ARN: {PROFILE_ARN}")
return True
else:
logger.warning("No profiles found")
return False
except requests.exceptions.RequestException as e:
logger.error(f"ListAvailableProfiles failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Server response: {e.response.status_code} {e.response.text}")
return False
def test_get_models():
"""Tests the ListAvailableModels endpoint."""
logger.info("Testing /ListAvailableModels...")
url = f"{KIRO_API_HOST}/ListAvailableModels"
params = {
"origin": "AI_EDITOR",
"profileArn": PROFILE_ARN
}
try:
response = requests.get(url, headers=HEADERS, params=params)
response.raise_for_status()
logger.info(f"Response status: {response.status_code}")
logger.debug(f"Response (JSON):\n{json.dumps(response.json(), indent=2, ensure_ascii=False)}")
logger.success("ListAvailableModels test COMPLETED SUCCESSFULLY")
return True
except requests.exceptions.RequestException as e:
logger.error(f"ListAvailableModels test failed: {e}")
return False
def test_generate_content():
"""Tests the generateAssistantResponse endpoint."""
logger.info("Testing /generateAssistantResponse...")
url = f"{KIRO_API_HOST}/generateAssistantResponse"
payload = {
"conversationState": {
"agentContinuationId": str(uuid.uuid4()),
"agentTaskType": "vibe",
"chatTriggerType": "MANUAL",
"conversationId": str(uuid.uuid4()),
"currentMessage": {
"userInputMessage": {
"content": "Hello! Say something short.",
"modelId": "claude-haiku-4.5",
"origin": "AI_EDITOR",
"userInputMessageContext": {
"tools": []
}
}
},
"history": []
}
}
# Only add profileArn if it's set and not the default
if PROFILE_ARN and AUTH_TYPE != AuthType.AWS_SSO_OIDC:
payload["profileArn"] = PROFILE_ARN
try:
with requests.post(url, headers=HEADERS, json=payload, stream=True) as response:
response.raise_for_status()
logger.info(f"Response status: {response.status_code}")
logger.info("Streaming response:")
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Try to decode and find JSON
chunk_str = chunk.decode('utf-8', errors='ignore')
logger.debug(f"Chunk: {chunk_str[:200]}...")
logger.success("generateAssistantResponse test COMPLETED")
return True
except requests.exceptions.RequestException as e:
logger.error(f"generateAssistantResponse test failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Server response: {e.response.status_code} {e.response.text}")
return False
if __name__ == "__main__":
logger.info(f"Starting Kiro API tests...")
logger.info(f" Credentials source: {cred_source}")
logger.info(f" Auth type: {AUTH_TYPE.value}")
logger.info(f" Region: {KIRO_REGION}")
logger.info(f" API Host: {KIRO_API_HOST}")
# Check if we already have a valid token from the database
if AUTH_TOKEN:
HEADERS['Authorization'] = f"Bearer {AUTH_TOKEN}"
logger.info("Using existing valid access token from database")
token_ok = True
else:
token_ok = refresh_auth_token()
if token_ok:
# Get profile ARN dynamically for AWS SSO OIDC users
if AUTH_TYPE == AuthType.AWS_SSO_OIDC:
get_profile_arn()
models_ok = test_get_models()
generate_ok = test_generate_content()
if models_ok and generate_ok:
logger.success(f"All tests passed successfully!")
logger.success(f" Auth type: {AUTH_TYPE.value}")
logger.success(f" Credentials: {cred_source}")
else:
logger.warning(f"One or more tests failed.")
else:
logger.error("Failed to refresh token. Tests not started.")
logger.error(f" Auth type: {AUTH_TYPE.value}")
logger.error(f" Token URL: {AWS_SSO_OIDC_TOKEN_URL if AUTH_TYPE == AuthType.AWS_SSO_OIDC else KIRO_DESKTOP_TOKEN_URL}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment