Skip to content

Instantly share code, notes, and snippets.

@etahamad
Created December 12, 2025 23:07
Show Gist options
  • Select an option

  • Save etahamad/2915c7f39531cc333d844e8ed9a65bed to your computer and use it in GitHub Desktop.

Select an option

Save etahamad/2915c7f39531cc333d844e8ed9a65bed to your computer and use it in GitHub Desktop.
import runpod
import json
import urllib.request
import urllib.parse
import time
import os
import requests
import base64
from io import BytesIO
import websocket
import uuid
import tempfile
import socket
import traceback
import copy
import sys
import importlib
import importlib.abc
# ---------------------------------------------------------------------------
# ComfyUI Authentication Patch: Fix comfy_api_nodes authentication
# This patches the comfy_api_nodes module to use config-based auth instead
# of requiring a user session, fixing the "Unauthorized: Please login first" error
# ---------------------------------------------------------------------------
# Global variable to store API key from request payload
_request_api_key = None
def get_system_api_key():
"""Get API key from system sources (config file or environment variable)"""
config_file = "/comfyui/user/default/config.json"
try:
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
api_key = config.get("user", {}).get("api_key", "")
if api_key:
print(f"worker-comfyui - Found API key in config['user']['api_key']")
return api_key
# Also check root level
api_key = config.get("api_key", "")
if api_key:
print(f"worker-comfyui - Found API key in config['api_key']")
return api_key
# Also check environment variable
api_key = os.environ.get("API_KEY_COMFY_ORG", "")
if api_key:
print(f"worker-comfyui - Found API key in environment variable")
return api_key
return None
except Exception as e:
print(f"worker-comfyui - Warning: Error reading config: {e}")
return None
def _patch_comfy_api_nodes_auth():
"""Patch comfy_api_nodes to use API key from config file or request payload"""
def get_api_key_from_config():
"""Read API key from ComfyUI config file, environment variable, or request payload"""
# First check if API key was provided in request payload
global _request_api_key
if _request_api_key:
print(f"worker-comfyui - Found API key in request payload")
return _request_api_key
# Fall back to system API key
system_key = get_system_api_key()
if system_key:
return system_key
print("worker-comfyui - Warning: No API key found in config, environment, or request payload")
return None
def patch_client_module(module):
"""Patch the _request_base function in the client module"""
# Check if we have an API key (will check global, config, env)
api_key = get_api_key_from_config()
if not api_key:
return False
if not hasattr(module, '_request_base'):
return False
original_request = module._request_base
# Check if it's async
import inspect
is_async = inspect.iscoroutinefunction(original_request)
if is_async:
async def patched_request_base(cfg, *args, **kwargs):
# Get API key dynamically (may have been set in request payload)
current_api_key = get_api_key_from_config()
if current_api_key:
# Add API key to headers if not present
if hasattr(cfg, 'headers'):
if not isinstance(cfg.headers, dict):
cfg.headers = {}
if 'X-API-Key' not in cfg.headers:
cfg.headers['X-API-Key'] = current_api_key
if 'Authorization' not in cfg.headers:
cfg.headers['Authorization'] = f'Bearer {current_api_key}'
# Call original function
try:
return await original_request(cfg, *args, **kwargs)
except Exception as e:
# If we get "Please login first" error, ensure headers are set and retry
error_str = str(e)
if 'login' in error_str.lower() or 'unauthorized' in error_str.lower():
if current_api_key and hasattr(cfg, 'headers'):
if not isinstance(cfg.headers, dict):
cfg.headers = {}
cfg.headers['X-API-Key'] = current_api_key
cfg.headers['Authorization'] = f'Bearer {current_api_key}'
return await original_request(cfg, *args, **kwargs)
raise
else:
def patched_request_base(cfg, *args, **kwargs):
# Get API key dynamically (may have been set in request payload)
current_api_key = get_api_key_from_config()
if current_api_key:
# Add API key to headers if not present
if hasattr(cfg, 'headers'):
if not isinstance(cfg.headers, dict):
cfg.headers = {}
if 'X-API-Key' not in cfg.headers:
cfg.headers['X-API-Key'] = current_api_key
if 'Authorization' not in cfg.headers:
cfg.headers['Authorization'] = f'Bearer {current_api_key}'
# Call original function
try:
return original_request(cfg, *args, **kwargs)
except Exception as e:
# If we get "Please login first" error, ensure headers are set and retry
error_str = str(e)
if 'login' in error_str.lower() or 'unauthorized' in error_str.lower():
if current_api_key and hasattr(cfg, 'headers'):
if not isinstance(cfg.headers, dict):
cfg.headers = {}
cfg.headers['X-API-Key'] = current_api_key
cfg.headers['Authorization'] = f'Bearer {current_api_key}'
return original_request(cfg, *args, **kwargs)
raise
# Apply the patch
module._request_base = patched_request_base
print(f"worker-comfyui - ✓ Patched comfy_api_nodes authentication (async={is_async}, api_key={'***' + api_key[-4:] if api_key and len(api_key) > 4 else 'N/A'})")
return True
# Set up import hook to intercept comfy_api_nodes.util.client when imported
# Use importlib's machinery for Python 3 compatibility
class ComfyAPINodesPatcher(importlib.abc.MetaPathFinder):
def find_spec(self, name, path, target=None):
# Let normal import proceed
return None
def find_module(self, name, path=None):
# Legacy method for Python < 3.4
return None
# Add our finder to meta_path
patcher_finder = ComfyAPINodesPatcher()
if patcher_finder not in sys.meta_path:
sys.meta_path.insert(0, patcher_finder)
# Also set up a simpler hook using __import__
original_import = __import__
def patched_import(name, globals=None, locals=None, fromlist=(), level=0):
module = original_import(name, globals, locals, fromlist, level)
# Check if this is the client module we want to patch
if name == 'comfy_api_nodes.util.client' or (
'comfy_api_nodes' in name and 'client' in name and
hasattr(module, '__file__') and
module.__file__ and
'client.py' in module.__file__
):
patch_client_module(module)
return module
# Replace the built-in __import__ in builtins module
import builtins
builtins.__import__ = patched_import
# Also try to patch already-loaded modules
for module_name in list(sys.modules.keys()):
if 'comfy_api_nodes' in module_name and 'client' in module_name:
try:
module = sys.modules[module_name]
if hasattr(module, '_request_base'):
patch_client_module(module)
except Exception:
pass
# Set up a periodic check to patch modules that load later
def periodic_patch_check():
"""Periodically check for and patch comfy_api_nodes"""
import threading
def check_and_patch():
time.sleep(5) # Wait for ComfyUI to start loading custom nodes
for _ in range(10): # Check up to 10 times
try:
# Try to import and patch
if 'comfy_api_nodes.util.client' in sys.modules:
module = sys.modules['comfy_api_nodes.util.client']
if hasattr(module, '_request_base'):
patch_client_module(module)
break
except Exception:
pass
time.sleep(2)
thread = threading.Thread(target=check_and_patch, daemon=True)
thread.start()
periodic_patch_check()
# Apply the patch early
_patch_comfy_api_nodes_auth()
# Time to wait between API check attempts in milliseconds
COMFY_API_AVAILABLE_INTERVAL_MS = 50
# Maximum number of API check attempts
COMFY_API_AVAILABLE_MAX_RETRIES = 500
# Websocket reconnection behaviour (can be overridden through environment variables)
# NOTE: more attempts and diagnostics improve debuggability whenever ComfyUI crashes mid-job.
# • WEBSOCKET_RECONNECT_ATTEMPTS sets how many times we will try to reconnect.
# • WEBSOCKET_RECONNECT_DELAY_S sets the sleep in seconds between attempts.
#
# If the respective env-vars are not supplied we fall back to sensible defaults ("5" and "3").
WEBSOCKET_RECONNECT_ATTEMPTS = int(os.environ.get("WEBSOCKET_RECONNECT_ATTEMPTS", 5))
WEBSOCKET_RECONNECT_DELAY_S = int(os.environ.get("WEBSOCKET_RECONNECT_DELAY_S", 3))
# Retry configuration for 504 Gateway Timeout errors (e.g., from Gemini API with large images)
# • GEMINI_504_RETRY_ATTEMPTS sets how many times to retry on 504 errors (default: 3)
# • GEMINI_504_RETRY_DELAY_S sets the initial delay in seconds (default: 5, uses exponential backoff)
GEMINI_504_RETRY_ATTEMPTS = int(os.environ.get("GEMINI_504_RETRY_ATTEMPTS", 3))
GEMINI_504_RETRY_DELAY_S = int(os.environ.get("GEMINI_504_RETRY_DELAY_S", 5))
# Timeout configuration for large images and Gemini API calls
# • WEBSOCKET_RECV_TIMEOUT_S sets websocket receive timeout in seconds (default: 300 = 5 minutes, set higher for 4K images)
# • WORKFLOW_EXECUTION_TIMEOUT_S sets maximum time to wait for workflow completion (default: 600 = 10 minutes, set higher for large images)
# • For 4K images with Gemini, consider setting WEBSOCKET_RECV_TIMEOUT_S=600 and WORKFLOW_EXECUTION_TIMEOUT_S=1200
WEBSOCKET_RECV_TIMEOUT_S = int(os.environ.get("WEBSOCKET_RECV_TIMEOUT_S", 300)) # 5 minutes default
WORKFLOW_EXECUTION_TIMEOUT_S = int(os.environ.get("WORKFLOW_EXECUTION_TIMEOUT_S", 600)) # 10 minutes default
# Extra verbose websocket trace logs (set WEBSOCKET_TRACE=true to enable)
if os.environ.get("WEBSOCKET_TRACE", "false").lower() == "true":
# This prints low-level frame information to stdout which is invaluable for diagnosing
# protocol errors but can be noisy in production – therefore gated behind an env-var.
websocket.enableTrace(True)
# Host where ComfyUI is running
COMFY_HOST = "127.0.0.1:8188"
# Enforce a clean state after each job is done
# see https://docs.runpod.io/docs/handler-additional-controls#refresh-worker
REFRESH_WORKER = os.environ.get("REFRESH_WORKER", "false").lower() == "true"
# Maximum response size in bytes (RunPod typically has a limit around 1-6MB for JSON responses)
# Base64 encoding increases size by ~33%, so we check the JSON size
MAX_RESPONSE_SIZE_BYTES = int(os.environ.get("MAX_RESPONSE_SIZE_BYTES", 5242880)) # Default 5MB
# Threshold for requiring S3 upload (files larger than this should use S3 to avoid response size issues)
# 4K images/videos can easily exceed this, so we set it conservatively
REQUIRE_S3_SIZE_THRESHOLD = int(os.environ.get("REQUIRE_S3_SIZE_THRESHOLD", 1048576)) # Default 1MB (raw file size)
# ---------------------------------------------------------------------------
# S3 Storage Detection: Check if Cloudflare R2 storage is configured
# ---------------------------------------------------------------------------
def is_s3_configured():
"""Check if Cloudflare R2 storage is configured."""
return os.environ.get("BUCKET_ENDPOINT_URL") is not None and os.environ.get("BUCKET_NAME") is not None
def log_storage_configuration():
"""Log the current S3 storage configuration at startup."""
if is_s3_configured():
endpoint_url = os.environ.get("BUCKET_ENDPOINT_URL")
bucket_name = os.environ.get("BUCKET_NAME")
region = os.environ.get("S3_REGION", "auto")
public_url = os.environ.get("BUCKET_PUBLIC_URL")
# Detect storage provider
storage_provider = "S3-compatible storage"
if endpoint_url:
if 'storage.supabase.co' in endpoint_url:
storage_provider = "Supabase Storage"
elif 'r2.cloudflarestorage.com' in endpoint_url:
storage_provider = "Cloudflare R2"
print(f"worker-comfyui - S3 storage: {storage_provider} configured")
print(f"worker-comfyui - S3 storage: Endpoint: {endpoint_url}")
print(f"worker-comfyui - S3 storage: Bucket: {bucket_name}")
print(f"worker-comfyui - S3 storage: Region: {region}")
if public_url:
print(f"worker-comfyui - S3 storage: Public URL: {public_url}")
else:
print("worker-comfyui - S3 storage: Public URL: Not configured (will construct from endpoint)")
print(f"worker-comfyui - S3 storage: Using {storage_provider} S3-compatible API")
if 'r2.cloudflarestorage.com' in (endpoint_url or ""):
print("worker-comfyui - S3 storage: Tip: Configure lifecycle policy to auto-delete files after 1 day (see docs/s3_setup_guide.md)")
else:
print("worker-comfyui - S3 storage: Not configured (files will be returned as base64)")
print("worker-comfyui - S3 storage: Large files (>1MB) will fail. Configure BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY to enable S3 upload.")
# Log storage configuration at module load time
log_storage_configuration()
# ---------------------------------------------------------------------------
# Helper: quick reachability probe of ComfyUI HTTP endpoint (port 8188)
# ---------------------------------------------------------------------------
def _comfy_server_status():
"""Return a dictionary with basic reachability info for the ComfyUI HTTP server."""
try:
resp = requests.get(f"http://{COMFY_HOST}/", timeout=5)
return {
"reachable": resp.status_code == 200,
"status_code": resp.status_code,
}
except Exception as exc:
return {"reachable": False, "error": str(exc)}
def _attempt_websocket_reconnect(ws_url, max_attempts, delay_s, initial_error):
"""
Attempts to reconnect to the WebSocket server after a disconnect.
Args:
ws_url (str): The WebSocket URL (including client_id).
max_attempts (int): Maximum number of reconnection attempts.
delay_s (int): Delay in seconds between attempts.
initial_error (Exception): The error that triggered the reconnect attempt.
Returns:
websocket.WebSocket: The newly connected WebSocket object.
Raises:
websocket.WebSocketConnectionClosedException: If reconnection fails after all attempts.
"""
print(
f"worker-comfyui - Websocket connection closed unexpectedly: {initial_error}. Attempting to reconnect..."
)
last_reconnect_error = initial_error
for attempt in range(max_attempts):
# Log current server status before each reconnect attempt so that we can
# see whether ComfyUI is still alive (HTTP port 8188 responding) even if
# the websocket dropped. This is extremely useful to differentiate
# between a network glitch and an outright ComfyUI crash/OOM-kill.
srv_status = _comfy_server_status()
if not srv_status["reachable"]:
# If ComfyUI itself is down there is no point in retrying the websocket –
# bail out immediately so the caller gets a clear "ComfyUI crashed" error.
print(
f"worker-comfyui - ComfyUI HTTP unreachable – aborting websocket reconnect: {srv_status.get('error', 'status '+str(srv_status.get('status_code')))}"
)
raise websocket.WebSocketConnectionClosedException(
"ComfyUI HTTP unreachable during websocket reconnect"
)
# Otherwise we proceed with reconnect attempts while server is up
print(
f"worker-comfyui - Reconnect attempt {attempt + 1}/{max_attempts}... (ComfyUI HTTP reachable, status {srv_status.get('status_code')})"
)
try:
# Need to create a new socket object for reconnect
new_ws = websocket.WebSocket()
new_ws.connect(ws_url, timeout=10) # Use existing ws_url
print(f"worker-comfyui - Websocket reconnected successfully.")
return new_ws # Return the new connected socket
except (
websocket.WebSocketException,
ConnectionRefusedError,
socket.timeout,
OSError,
) as reconn_err:
last_reconnect_error = reconn_err
print(
f"worker-comfyui - Reconnect attempt {attempt + 1} failed: {reconn_err}"
)
if attempt < max_attempts - 1:
print(
f"worker-comfyui - Waiting {delay_s} seconds before next attempt..."
)
time.sleep(delay_s)
else:
print(f"worker-comfyui - Max reconnection attempts reached.")
# If loop completes without returning, raise an exception
print("worker-comfyui - Failed to reconnect websocket after connection closed.")
raise websocket.WebSocketConnectionClosedException(
f"Connection closed and failed to reconnect. Last error: {last_reconnect_error}"
)
def validate_input(job_input):
"""
Validates the input for the handler function.
Args:
job_input (dict): The input data to validate.
Returns:
tuple: A tuple containing the validated data and an error message, if any.
The structure is (validated_data, error_message).
"""
# Validate if job_input is provided
if job_input is None:
return None, "Please provide input"
# Check if input is a string and try to parse it as JSON
if isinstance(job_input, str):
try:
job_input = json.loads(job_input)
except json.JSONDecodeError:
return None, "Invalid JSON format in input"
# Validate 'workflow' in input
workflow = job_input.get("workflow")
if workflow is None:
return None, "Missing 'workflow' parameter"
# Validate 'images' in input, if provided
images = job_input.get("images")
if images is not None:
if not isinstance(images, list) or not all(
"name" in image and "image" in image for image in images
):
return (
None,
"'images' must be a list of objects with 'name' and 'image' keys",
)
# Return validated data and no error
return {"workflow": workflow, "images": images}, None
def check_server(url, retries=500, delay=50):
"""
Check if a server is reachable via HTTP GET request
Args:
- url (str): The URL to check
- retries (int, optional): The number of times to attempt connecting to the server. Default is 50
- delay (int, optional): The time in milliseconds to wait between retries. Default is 500
Returns:
bool: True if the server is reachable within the given number of retries, otherwise False
"""
print(f"worker-comfyui - Checking API server at {url}...")
for i in range(retries):
try:
response = requests.get(url, timeout=5)
# If the response status code is 200, the server is up and running
if response.status_code == 200:
print(f"worker-comfyui - API is reachable")
return True
except requests.Timeout:
pass
except requests.RequestException as e:
pass
# Wait for the specified delay before retrying
time.sleep(delay / 1000)
print(
f"worker-comfyui - Failed to connect to server at {url} after {retries} attempts."
)
return False
def download_and_upload_image_from_url(url, filename=None):
"""
Download an image from a URL and upload it to ComfyUI.
Args:
url (str): The URL of the image to download
filename (str, optional): The filename to use. If None, extracts from URL.
Returns:
tuple: (success: bool, filename: str or error_message: str)
"""
try:
# Extract filename from URL if not provided
if not filename:
# Get filename from URL path
parsed_url = urllib.parse.urlparse(url)
filename = os.path.basename(parsed_url.path)
# URL decode the filename
filename = urllib.parse.unquote(filename)
if not filename or '.' not in filename:
# Fallback to a default name
filename = f"image_{hash(url) % 10000}.png"
print(f"worker-comfyui - Downloading image from URL: {url}")
response = requests.get(url, timeout=60, stream=True)
response.raise_for_status()
# Get content type to determine file extension if needed
content_type = response.headers.get('content-type', '')
if not filename.endswith(('.png', '.jpg', '.jpeg', '.webp', '.gif')):
if 'image/png' in content_type:
filename = filename.rsplit('.', 1)[0] + '.png'
elif 'image/jpeg' in content_type:
filename = filename.rsplit('.', 1)[0] + '.jpg'
elif 'image/webp' in content_type:
filename = filename.rsplit('.', 1)[0] + '.webp'
# Read image data
image_data = response.content
# Upload to ComfyUI
files = {
"image": (filename, BytesIO(image_data), content_type or "image/png"),
"overwrite": (None, "true"),
}
upload_response = requests.post(
f"http://{COMFY_HOST}/upload/image", files=files, timeout=60
)
upload_response.raise_for_status()
print(f"worker-comfyui - Successfully downloaded and uploaded {filename}")
return True, filename
except requests.Timeout:
error_msg = f"Timeout downloading image from {url}"
print(f"worker-comfyui - {error_msg}")
return False, error_msg
except requests.RequestException as e:
error_msg = f"Error downloading/uploading image from {url}: {e}"
print(f"worker-comfyui - {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error downloading image from {url}: {e}"
print(f"worker-comfyui - {error_msg}")
return False, error_msg
def extract_and_process_urls_from_workflow(workflow):
"""
Extract image URLs from LoadImage nodes in the workflow, download them,
upload to ComfyUI, and replace URLs with local filenames.
Args:
workflow (dict): The workflow dictionary
Returns:
tuple: (modified_workflow: dict, errors: list)
"""
errors = []
url_to_filename = {} # Cache to avoid downloading same URL twice
# Deep copy the workflow to avoid modifying the original
modified_workflow = copy.deepcopy(workflow)
# Iterate through all nodes in the workflow
for node_id, node_data in modified_workflow.items():
if not isinstance(node_data, dict):
continue
inputs = node_data.get("inputs", {})
class_type = node_data.get("class_type", "")
# Check if this is a LoadImage node
if class_type == "LoadImage" and "image" in inputs:
image_value = inputs["image"]
# Check if it's a URL (starts with http:// or https://)
if isinstance(image_value, str) and (
image_value.startswith("http://") or image_value.startswith("https://")
):
url = image_value
# Check if we've already processed this URL
if url in url_to_filename:
inputs["image"] = url_to_filename[url]
print(
f"worker-comfyui - Reusing cached filename for URL: {url} -> {url_to_filename[url]}"
)
continue
# Download and upload the image
success, result = download_and_upload_image_from_url(url)
if success:
filename = result
url_to_filename[url] = filename
inputs["image"] = filename
print(
f"worker-comfyui - Replaced URL with filename in node {node_id}: {url} -> {filename}"
)
else:
error_msg = f"Failed to process image URL in node {node_id}: {result}"
errors.append(error_msg)
print(f"worker-comfyui - {error_msg}")
return modified_workflow, errors
def upload_images(images):
"""
Upload a list of base64 encoded images to the ComfyUI server using the /upload/image endpoint.
Args:
images (list): A list of dictionaries, each containing the 'name' of the image and the 'image' as a base64 encoded string.
Returns:
dict: A dictionary indicating success or error.
"""
if not images:
return {"status": "success", "message": "No images to upload", "details": []}
responses = []
upload_errors = []
print(f"worker-comfyui - Uploading {len(images)} image(s)...")
for image in images:
try:
name = image["name"]
image_data_uri = image["image"] # Get the full string (might have prefix)
# --- Strip Data URI prefix if present ---
if "," in image_data_uri:
# Find the comma and take everything after it
base64_data = image_data_uri.split(",", 1)[1]
else:
# Assume it's already pure base64
base64_data = image_data_uri
# --- End strip ---
blob = base64.b64decode(base64_data) # Decode the cleaned data
# Prepare the form data
files = {
"image": (name, BytesIO(blob), "image/png"),
"overwrite": (None, "true"),
}
# POST request to upload the image
response = requests.post(
f"http://{COMFY_HOST}/upload/image", files=files, timeout=30
)
response.raise_for_status()
responses.append(f"Successfully uploaded {name}")
print(f"worker-comfyui - Successfully uploaded {name}")
except base64.binascii.Error as e:
error_msg = f"Error decoding base64 for {image.get('name', 'unknown')}: {e}"
print(f"worker-comfyui - {error_msg}")
upload_errors.append(error_msg)
except requests.Timeout:
error_msg = f"Timeout uploading {image.get('name', 'unknown')}"
print(f"worker-comfyui - {error_msg}")
upload_errors.append(error_msg)
except requests.RequestException as e:
error_msg = f"Error uploading {image.get('name', 'unknown')}: {e}"
print(f"worker-comfyui - {error_msg}")
upload_errors.append(error_msg)
except Exception as e:
error_msg = (
f"Unexpected error uploading {image.get('name', 'unknown')}: {e}"
)
print(f"worker-comfyui - {error_msg}")
upload_errors.append(error_msg)
if upload_errors:
print(f"worker-comfyui - image(s) upload finished with errors")
return {
"status": "error",
"message": "Some images failed to upload",
"details": upload_errors,
}
print(f"worker-comfyui - image(s) upload complete")
return {
"status": "success",
"message": "All images uploaded successfully",
"details": responses,
}
def get_available_models():
"""
Get list of available models from ComfyUI
Returns:
dict: Dictionary containing available models by type
"""
try:
response = requests.get(f"http://{COMFY_HOST}/object_info", timeout=10)
response.raise_for_status()
object_info = response.json()
# Extract available checkpoints from CheckpointLoaderSimple
available_models = {}
if "CheckpointLoaderSimple" in object_info:
checkpoint_info = object_info["CheckpointLoaderSimple"]
if "input" in checkpoint_info and "required" in checkpoint_info["input"]:
ckpt_options = checkpoint_info["input"]["required"].get("ckpt_name")
if ckpt_options and len(ckpt_options) > 0:
available_models["checkpoints"] = (
ckpt_options[0] if isinstance(ckpt_options[0], list) else []
)
return available_models
except Exception as e:
print(f"worker-comfyui - Warning: Could not fetch available models: {e}")
return {}
def queue_workflow(workflow, client_id, extra_data=None):
"""
Queue a workflow to be processed by ComfyUI
Args:
workflow (dict): A dictionary containing the workflow to be processed
client_id (str): The client ID for the websocket connection
extra_data (dict, optional): Extra data to include in the payload (e.g., API keys)
Returns:
dict: The JSON response from ComfyUI after processing the workflow
Raises:
ValueError: If the workflow validation fails with detailed error information
"""
# Include client_id and extra_data in the prompt payload
payload = {"prompt": workflow, "client_id": client_id}
if extra_data:
payload["extra_data"] = extra_data
data = json.dumps(payload).encode("utf-8")
# Use requests for consistency and timeout
headers = {"Content-Type": "application/json"}
response = requests.post(
f"http://{COMFY_HOST}/prompt", data=data, headers=headers, timeout=30
)
# Handle validation errors with detailed information
if response.status_code == 400:
print(f"worker-comfyui - ComfyUI returned 400. Response body: {response.text}")
try:
error_data = response.json()
print(f"worker-comfyui - Parsed error data: {error_data}")
# Try to extract meaningful error information
error_message = "Workflow validation failed"
error_details = []
# ComfyUI seems to return different error formats, let's handle them all
if "error" in error_data:
error_info = error_data["error"]
if isinstance(error_info, dict):
error_message = error_info.get("message", error_message)
if error_info.get("type") == "prompt_outputs_failed_validation":
error_message = "Workflow validation failed"
else:
error_message = str(error_info)
# Check for node validation errors in the response
if "node_errors" in error_data:
for node_id, node_error in error_data["node_errors"].items():
if isinstance(node_error, dict):
for error_type, error_msg in node_error.items():
error_details.append(
f"Node {node_id} ({error_type}): {error_msg}"
)
else:
error_details.append(f"Node {node_id}: {node_error}")
# Check if the error data itself contains validation info
if error_data.get("type") == "prompt_outputs_failed_validation":
error_message = error_data.get("message", "Workflow validation failed")
# For this type of error, we need to parse the validation details from logs
# Since ComfyUI doesn't seem to include detailed validation errors in the response
# Let's provide a more helpful generic message
available_models = get_available_models()
if available_models.get("checkpoints"):
error_message += f"\n\nThis usually means a required model or parameter is not available."
error_message += f"\nAvailable checkpoint models: {', '.join(available_models['checkpoints'])}"
else:
error_message += "\n\nThis usually means a required model or parameter is not available."
error_message += "\nNo checkpoint models appear to be available. Please check your model installation."
raise ValueError(error_message)
# If we have specific validation errors, format them nicely
if error_details:
detailed_message = f"{error_message}:\n" + "\n".join(
f"• {detail}" for detail in error_details
)
# Try to provide helpful suggestions for common errors
if any(
"not in list" in detail and "ckpt_name" in detail
for detail in error_details
):
available_models = get_available_models()
if available_models.get("checkpoints"):
detailed_message += f"\n\nAvailable checkpoint models: {', '.join(available_models['checkpoints'])}"
else:
detailed_message += "\n\nNo checkpoint models appear to be available. Please check your model installation."
raise ValueError(detailed_message)
else:
# Fallback to the raw response if we can't parse specific errors
raise ValueError(f"{error_message}. Raw response: {response.text}")
except (json.JSONDecodeError, KeyError) as e:
# If we can't parse the error response, fall back to the raw text
raise ValueError(
f"ComfyUI validation failed (could not parse error response): {response.text}"
)
# For other HTTP errors, raise them normally
response.raise_for_status()
return response.json()
def get_history(prompt_id):
"""
Retrieve the history of a given prompt using its ID
Args:
prompt_id (str): The ID of the prompt whose history is to be retrieved
Returns:
dict: The history of the prompt, containing all the processing steps and results
"""
# Use requests for consistency and timeout
response = requests.get(f"http://{COMFY_HOST}/history/{prompt_id}", timeout=30)
response.raise_for_status()
return response.json()
def get_image_data(filename, subfolder, image_type):
"""
Fetch image bytes from the ComfyUI /view endpoint.
Args:
filename (str): The filename of the image.
subfolder (str): The subfolder where the image is stored.
image_type (str): The type of the image (e.g., 'output').
Returns:
bytes: The raw image data, or None if an error occurs.
"""
print(
f"worker-comfyui - Fetching image data: type={image_type}, subfolder={subfolder}, filename={filename}"
)
data = {"filename": filename, "subfolder": subfolder, "type": image_type}
url_values = urllib.parse.urlencode(data)
try:
# Use requests for consistency and timeout
response = requests.get(f"http://{COMFY_HOST}/view?{url_values}", timeout=60)
response.raise_for_status()
print(f"worker-comfyui - Successfully fetched image data for {filename}")
return response.content
except requests.Timeout:
print(f"worker-comfyui - Timeout fetching image data for {filename}")
return None
except requests.RequestException as e:
print(f"worker-comfyui - Error fetching image data for {filename}: {e}")
return None
except Exception as e:
print(
f"worker-comfyui - Unexpected error fetching image data for {filename}: {e}"
)
return None
def has_gemini_nodes(workflow):
"""
Check if workflow contains GeminiImage2Node or GeminiImageDirectNode nodes
(which may need longer timeouts for large images).
Args:
workflow (dict): The workflow dictionary
Returns:
bool: True if workflow contains Gemini nodes
"""
for node_id, node_data in workflow.items():
if isinstance(node_data, dict):
class_type = node_data.get("class_type", "")
if class_type in ("GeminiImage2Node", "GeminiImageDirectNode"):
return True
return False
def calculate_response_size(response_dict):
"""
Calculate the approximate size of a response dictionary when serialized to JSON.
Args:
response_dict (dict): The response dictionary to measure.
Returns:
int: The approximate size in bytes.
"""
try:
json_str = json.dumps(response_dict, ensure_ascii=False)
return len(json_str.encode('utf-8'))
except Exception as e:
print(f"worker-comfyui - Warning: Could not calculate response size: {e}")
return 0
def handler(job):
"""
Handles a job using ComfyUI via websockets for status and image retrieval.
Args:
job (dict): A dictionary containing job details and input parameters.
Returns:
dict: A dictionary containing either an error message or a success status with generated images.
"""
job_input = job["input"]
job_id = job["id"]
# Always ensure extra_data exists and has API key (from request or system)
global _request_api_key
extra_data = {}
s3_config = {}
# Extract extra_data and s3_config from request if provided
if isinstance(job_input, dict):
request_extra_data = job_input.get("extra_data")
if isinstance(request_extra_data, dict):
extra_data = request_extra_data.copy()
request_s3_config = job_input.get("s3_config")
if isinstance(request_s3_config, dict):
s3_config = request_s3_config.copy()
print(f"worker-comfyui - Extracted s3_config from request: {s3_config}")
else:
print(f"worker-comfyui - No s3_config found in request (type: {type(request_s3_config)})")
# Check if API key is provided in request
request_api_key = extra_data.get("api_key_comfy_org")
if request_api_key:
_request_api_key = request_api_key
print(f"worker-comfyui - Found API key in request extra_data")
# Also update the patch to use this API key
_patch_comfy_api_nodes_auth()
else:
# No API key in request, always use system API key
system_api_key = get_system_api_key()
if system_api_key:
extra_data["api_key_comfy_org"] = system_api_key
print(f"worker-comfyui - Using system API key (from config/env)")
# Set environment variable for nodes
os.environ["API_KEY_COMFY_ORG"] = system_api_key
# Update the patch to use this API key
_patch_comfy_api_nodes_auth()
else:
print(f"worker-comfyui - Warning: No API key found in request or system")
# Make sure that the input is valid
validated_data, error_message = validate_input(job_input)
if error_message:
return {"error": error_message}
# Extract validated data
workflow = validated_data["workflow"]
input_images = validated_data.get("images")
# Check if workflow contains Gemini nodes - if so, use longer timeouts for large images
contains_gemini = has_gemini_nodes(workflow)
# Use workflow-specific timeouts (can be overridden by environment variables)
ws_recv_timeout = WEBSOCKET_RECV_TIMEOUT_S
workflow_exec_timeout = WORKFLOW_EXECUTION_TIMEOUT_S
if contains_gemini:
# Use longer timeouts for Gemini workflows (especially with 4K images)
# Check for Gemini-specific timeout env vars first, then use 2x defaults if not set
gemini_ws_timeout = int(os.environ.get("GEMINI_WEBSOCKET_RECV_TIMEOUT_S", WEBSOCKET_RECV_TIMEOUT_S * 2))
gemini_workflow_timeout = int(os.environ.get("GEMINI_WORKFLOW_EXECUTION_TIMEOUT_S", WORKFLOW_EXECUTION_TIMEOUT_S * 2))
# Only use Gemini-specific timeouts if base timeouts weren't explicitly set
if os.environ.get("WEBSOCKET_RECV_TIMEOUT_S") is None:
ws_recv_timeout = gemini_ws_timeout
if os.environ.get("WORKFLOW_EXECUTION_TIMEOUT_S") is None:
workflow_exec_timeout = gemini_workflow_timeout
print(f"worker-comfyui - Detected Gemini node (GeminiImage2Node or GeminiImageDirectNode) in workflow - using extended timeouts:")
print(f"worker-comfyui - Websocket receive timeout: {ws_recv_timeout}s")
print(f"worker-comfyui - Workflow execution timeout: {workflow_exec_timeout}s")
# Make sure that the ComfyUI HTTP API is available before proceeding
if not check_server(
f"http://{COMFY_HOST}/",
COMFY_API_AVAILABLE_MAX_RETRIES,
COMFY_API_AVAILABLE_INTERVAL_MS,
):
return {
"error": f"ComfyUI server ({COMFY_HOST}) not reachable after multiple retries."
}
# Extract and process image URLs from workflow
print("worker-comfyui - Scanning workflow for image URLs...")
workflow, url_errors = extract_and_process_urls_from_workflow(workflow)
if url_errors:
print(f"worker-comfyui - Warning: {len(url_errors)} error(s) processing image URLs")
# Continue anyway, but log the errors
# Upload input images if they exist
if input_images:
upload_result = upload_images(input_images)
if upload_result["status"] == "error":
# Return upload errors
return {
"error": "Failed to upload one or more input images",
"details": upload_result["details"],
}
ws = None
client_id = str(uuid.uuid4())
prompt_id = None
output_data = []
errors = []
try:
# Establish WebSocket connection
ws_url = f"ws://{COMFY_HOST}/ws?clientId={client_id}"
print(f"worker-comfyui - Connecting to websocket: {ws_url}")
ws = websocket.WebSocket()
ws.connect(ws_url, timeout=10)
print(f"worker-comfyui - Websocket connected")
# Queue the workflow
try:
queued_workflow = queue_workflow(workflow, client_id, extra_data)
prompt_id = queued_workflow.get("prompt_id")
if not prompt_id:
raise ValueError(
f"Missing 'prompt_id' in queue response: {queued_workflow}"
)
print(f"worker-comfyui - Queued workflow with ID: {prompt_id}")
except requests.RequestException as e:
print(f"worker-comfyui - Error queuing workflow: {e}")
raise ValueError(f"Error queuing workflow: {e}")
except Exception as e:
print(f"worker-comfyui - Unexpected error queuing workflow: {e}")
# For ValueError exceptions from queue_workflow, pass through the original message
if isinstance(e, ValueError):
raise e
else:
raise ValueError(f"Unexpected error queuing workflow: {e}")
# Wait for execution completion via WebSocket
print(f"worker-comfyui - Waiting for workflow execution ({prompt_id})...")
print(f"worker-comfyui - Using websocket receive timeout: {ws_recv_timeout}s, workflow execution timeout: {workflow_exec_timeout}s")
# Set websocket receive timeout
ws.settimeout(ws_recv_timeout)
execution_done = False
start_time = time.time()
while True:
# Check if we've exceeded the workflow execution timeout
elapsed_time = time.time() - start_time
if elapsed_time > workflow_exec_timeout:
timeout_msg = f"Workflow execution exceeded timeout of {workflow_exec_timeout}s (elapsed: {elapsed_time:.1f}s)"
print(f"worker-comfyui - ERROR: {timeout_msg}")
errors.append(timeout_msg)
break
try:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message.get("type") == "status":
status_data = message.get("data", {}).get("status", {})
print(
f"worker-comfyui - Status update: {status_data.get('exec_info', {}).get('queue_remaining', 'N/A')} items remaining in queue"
)
elif message.get("type") == "executing":
data = message.get("data", {})
if (
data.get("node") is None
and data.get("prompt_id") == prompt_id
):
print(
f"worker-comfyui - Execution finished for prompt {prompt_id}"
)
execution_done = True
break
elif message.get("type") == "execution_error":
data = message.get("data", {})
if data.get("prompt_id") == prompt_id:
error_message = data.get('exception_message', '')
error_details = f"Node Type: {data.get('node_type')}, Node ID: {data.get('node_id')}, Message: {error_message}"
print(
f"worker-comfyui - Execution error received: {error_details}"
)
# Check if this is a 504 Gateway Timeout error (common with Gemini API and large images)
is_504_error = (
'504' in str(error_message) or
'gateway timeout' in str(error_message).lower() or
'timeout' in str(error_message).lower() and 'gemini' in str(error_message).lower()
)
if is_504_error:
print(
f"worker-comfyui - Detected 504 Gateway Timeout error (likely from Gemini API with large images)"
)
# Store error info for retry logic
errors.append({
"type": "504_timeout",
"details": error_details,
"node_type": data.get('node_type'),
"node_id": data.get('node_id'),
"message": error_message
})
else:
errors.append(f"Workflow execution error: {error_details}")
break
else:
continue
except websocket.WebSocketTimeoutException:
elapsed_time = time.time() - start_time
if elapsed_time > workflow_exec_timeout:
timeout_msg = f"Workflow execution exceeded timeout of {workflow_exec_timeout}s (elapsed: {elapsed_time:.1f}s)"
print(f"worker-comfyui - ERROR: {timeout_msg}")
errors.append(timeout_msg)
break
print(f"worker-comfyui - Websocket receive timed out (elapsed: {elapsed_time:.1f}s / {workflow_exec_timeout}s). Still waiting...")
continue
except websocket.WebSocketConnectionClosedException as closed_err:
try:
# Attempt to reconnect
ws = _attempt_websocket_reconnect(
ws_url,
WEBSOCKET_RECONNECT_ATTEMPTS,
WEBSOCKET_RECONNECT_DELAY_S,
closed_err,
)
print(
"worker-comfyui - Resuming message listening after successful reconnect."
)
continue
except (
websocket.WebSocketConnectionClosedException
) as reconn_failed_err:
# If _attempt_websocket_reconnect fails, it raises this exception
# Let this exception propagate to the outer handler's except block
raise reconn_failed_err
except json.JSONDecodeError:
print(f"worker-comfyui - Received invalid JSON message via websocket.")
# Check if we have 504 timeout errors that should be retried
retry_504_errors = [e for e in errors if isinstance(e, dict) and e.get("type") == "504_timeout"]
if retry_504_errors and GEMINI_504_RETRY_ATTEMPTS > 0:
print(f"worker-comfyui - Found {len(retry_504_errors)} 504 timeout error(s), attempting retry...")
# Close current websocket before retry
if ws and ws.connected:
ws.close()
# Retry the workflow execution with exponential backoff
for retry_attempt in range(GEMINI_504_RETRY_ATTEMPTS):
delay = GEMINI_504_RETRY_DELAY_S * (2 ** retry_attempt) # Exponential backoff: 5s, 10s, 20s
print(f"worker-comfyui - Retry attempt {retry_attempt + 1}/{GEMINI_504_RETRY_ATTEMPTS} after {delay}s delay...")
time.sleep(delay)
# Reconnect websocket
try:
ws = websocket.WebSocket()
ws.connect(ws_url, timeout=10)
print(f"worker-comfyui - Websocket reconnected for retry")
# Re-queue the workflow
try:
queued_workflow = queue_workflow(workflow, client_id, extra_data)
retry_prompt_id = queued_workflow.get("prompt_id")
if not retry_prompt_id:
print(f"worker-comfyui - Retry failed: Missing prompt_id")
break
print(f"worker-comfyui - Retry workflow queued with ID: {retry_prompt_id}")
# Wait for execution completion
retry_execution_done = False
retry_errors = []
retry_start_time = time.time()
ws.settimeout(ws_recv_timeout) # Set timeout for retry
while True:
# Check timeout for retry
retry_elapsed = time.time() - retry_start_time
if retry_elapsed > workflow_exec_timeout:
timeout_msg = f"Retry workflow execution exceeded timeout of {workflow_exec_timeout}s"
print(f"worker-comfyui - ERROR: {timeout_msg}")
retry_errors.append(timeout_msg)
break
try:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message.get("type") == "executing":
data = message.get("data", {})
if (
data.get("node") is None
and data.get("prompt_id") == retry_prompt_id
):
print(f"worker-comfyui - Retry execution finished for prompt {retry_prompt_id}")
retry_execution_done = True
prompt_id = retry_prompt_id # Update prompt_id for history fetch
break
elif message.get("type") == "execution_error":
data = message.get("data", {})
if data.get("prompt_id") == retry_prompt_id:
error_message = data.get('exception_message', '')
error_details = f"Node Type: {data.get('node_type')}, Node ID: {data.get('node_id')}, Message: {error_message}"
retry_errors.append(error_details)
# Check if still a 504 error
is_504_error = (
'504' in str(error_message) or
'gateway timeout' in str(error_message).lower()
)
if not is_504_error:
# Different error, break retry loop
print(f"worker-comfyui - Retry failed with different error: {error_details}")
retry_execution_done = False
break
else:
print(f"worker-comfyui - Retry still got 504 error, will retry again if attempts remain")
retry_execution_done = False
break
except websocket.WebSocketTimeoutException:
retry_elapsed = time.time() - retry_start_time
if retry_elapsed > workflow_exec_timeout:
timeout_msg = f"Retry workflow execution exceeded timeout of {workflow_exec_timeout}s"
print(f"worker-comfyui - ERROR: {timeout_msg}")
retry_errors.append(timeout_msg)
break
continue
except websocket.WebSocketConnectionClosedException:
print(f"worker-comfyui - Websocket closed during retry, attempting reconnect...")
ws = _attempt_websocket_reconnect(
ws_url,
WEBSOCKET_RECONNECT_ATTEMPTS,
WEBSOCKET_RECONNECT_DELAY_S,
websocket.WebSocketConnectionClosedException("Connection closed during retry")
)
continue
except json.JSONDecodeError:
continue
if retry_execution_done:
print(f"worker-comfyui - Retry succeeded!")
errors = [] # Clear 504 errors since retry succeeded
execution_done = True
break
elif retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1:
print(f"worker-comfyui - Retry attempt {retry_attempt + 1} failed, will retry again...")
if ws and ws.connected:
ws.close()
continue
else:
print(f"worker-comfyui - All retry attempts exhausted")
errors = [f"Workflow execution error (504 Gateway Timeout): {e.get('details', 'Unknown error')}" for e in retry_504_errors]
if retry_errors:
errors.extend([f"Retry error: {e}" for e in retry_errors])
break
except Exception as e:
print(f"worker-comfyui - Error during retry attempt {retry_attempt + 1}: {e}")
if retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1:
if ws and ws.connected:
ws.close()
continue
else:
errors = [f"Workflow execution error (504 Gateway Timeout): {err.get('details', 'Unknown error')}" for err in retry_504_errors]
errors.append(f"Retry failed: {str(e)}")
break
except Exception as e:
print(f"worker-comfyui - Error reconnecting websocket for retry: {e}")
if retry_attempt < GEMINI_504_RETRY_ATTEMPTS - 1:
continue
else:
errors = [f"Workflow execution error (504 Gateway Timeout): {err.get('details', 'Unknown error')}" for err in retry_504_errors]
errors.append(f"Failed to reconnect for retry: {str(e)}")
break
# Convert dict errors back to strings for consistency
errors = [e if isinstance(e, str) else e.get('details', str(e)) for e in errors]
if not execution_done and not errors:
raise ValueError(
"Workflow monitoring loop exited without confirmation of completion or error."
)
# Fetch history even if there were execution errors, some outputs might exist
print(f"worker-comfyui - Fetching history for prompt {prompt_id}...")
history = get_history(prompt_id)
if prompt_id not in history:
error_msg = f"Prompt ID {prompt_id} not found in history after execution."
print(f"worker-comfyui - {error_msg}")
if not errors:
return {"error": error_msg}
else:
errors.append(error_msg)
return {
"error": "Job processing failed, prompt ID not found in history.",
"details": errors,
}
prompt_history = history.get(prompt_id, {})
outputs = prompt_history.get("outputs", {})
if not outputs:
warning_msg = f"No outputs found in history for prompt {prompt_id}."
print(f"worker-comfyui - {warning_msg}")
if not errors:
errors.append(warning_msg)
print(f"worker-comfyui - Processing {len(outputs)} output nodes...")
for node_id, node_output in outputs.items():
if "images" in node_output:
print(
f"worker-comfyui - Node {node_id} contains {len(node_output['images'])} image(s)"
)
for image_info in node_output["images"]:
filename = image_info.get("filename")
subfolder = image_info.get("subfolder", "")
img_type = image_info.get("type")
# skip temp images
if img_type == "temp":
print(
f"worker-comfyui - Skipping image {filename} because type is 'temp'"
)
continue
if not filename:
warn_msg = f"Skipping image in node {node_id} due to missing filename: {image_info}"
print(f"worker-comfyui - {warn_msg}")
errors.append(warn_msg)
continue
image_bytes = get_image_data(filename, subfolder, img_type)
if image_bytes:
# Determine file extension - prioritize custom path extension if available, otherwise use filename
custom_path = s3_config.get("image_output_path") if s3_config else None
if custom_path:
# Extract extension from custom path (e.g., /collections/test123/test_out.png -> .png)
file_extension = os.path.splitext(custom_path)[1] or os.path.splitext(filename)[1] or ".png"
else:
# Use extension from ComfyUI filename
file_extension = os.path.splitext(filename)[1] or ".png"
image_size = len(image_bytes)
image_size_mb = image_size / (1024 * 1024)
# Check if this is a video file (common extensions)
is_video = file_extension.lower() in ('.mp4', '.webm', '.avi', '.mov', '.mkv', '.flv', '.m4v')
# Check if S3 upload is configured
use_s3 = is_s3_configured()
# Estimate base64 size (base64 increases size by ~33%)
estimated_base64_size = int(image_size * 1.34)
# For large files (especially 4K content), require S3 or fail
# 4K images/videos can easily be 10-50MB+ which would exceed response limits
if image_size > REQUIRE_S3_SIZE_THRESHOLD:
if not use_s3:
error_msg = (
f"File {filename} is too large ({image_size_mb:.2f} MB raw, ~{estimated_base64_size / (1024*1024):.2f} MB base64) "
f"to return in response. S3 upload is required for files larger than {REQUIRE_S3_SIZE_THRESHOLD / (1024*1024):.1f} MB. "
f"Please configure BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY environment variables. "
f"See docs/s3_setup_guide.md for Cloudflare R2 configuration instructions."
)
print(f"worker-comfyui - ERROR: {error_msg}")
errors.append(error_msg)
continue # Skip this file
else:
print(
f"worker-comfyui - File {filename} is large ({image_size_mb:.2f} MB), using S3 upload"
)
if use_s3 or image_size > REQUIRE_S3_SIZE_THRESHOLD:
# Use S3 upload when configured or required
try:
with tempfile.NamedTemporaryFile(
suffix=file_extension, delete=False
) as temp_file:
temp_file.write(image_bytes)
temp_file_path = temp_file.name
print(
f"worker-comfyui - Wrote file bytes to temporary file: {temp_file_path}"
)
print(f"worker-comfyui - Uploading {filename} to S3...")
# Use boto3 to upload to Cloudflare R2
import boto3
from botocore.config import Config
endpoint_url = os.environ.get("BUCKET_ENDPOINT_URL")
bucket_name = os.environ.get("BUCKET_NAME")
access_key = os.environ.get("BUCKET_ACCESS_KEY_ID")
secret_key = os.environ.get("BUCKET_SECRET_ACCESS_KEY")
region = os.environ.get("S3_REGION", "auto")
print(f"worker-comfyui - DEBUG: bucket_name from env = {bucket_name}")
print(f"worker-comfyui - DEBUG: job_id = {job_id}")
# Create S3 client for S3-compatible storage
boto3_config = Config(signature_version='s3v4')
s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
config=boto3_config
)
# Determine S3 key: use custom path from request s3_config if provided, otherwise use default
# This s3_key will be used for both upload AND response URL to ensure consistency
print(f"worker-comfyui - DEBUG: s3_config = {s3_config}, type = {type(s3_config)}")
custom_path = s3_config.get("image_output_path") if s3_config else None
print(f"worker-comfyui - DEBUG: custom_path = {custom_path}")
if custom_path:
# Use custom path from request, ensuring it doesn't start with a slash (S3 keys shouldn't)
s3_key = custom_path.lstrip("/")
print(f"worker-comfyui - Using custom S3 path from request: {s3_key}")
else:
# Default: use job_id/filename when no custom path provided
s3_key = f"{job_id}/{filename}"
print(f"worker-comfyui - Using default S3 path: {s3_key}")
# Determine ContentType based on file extension (required for Supabase Storage)
# Use the extension we determined earlier (from custom path if available, otherwise from filename)
# Default to 'image/png' since PNG is the most commonly used format
mime_types = {
'.png': 'image/png', # Most common format - default fallback
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.webp': 'image/webp',
'.gif': 'image/gif',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.avi': 'video/x-msvideo',
'.mov': 'video/quicktime',
'.mkv': 'video/x-matroska',
'.flv': 'video/x-flv',
'.m4v': 'video/x-m4v',
}
# Get ContentType from extension, defaulting to 'image/png' if extension not recognized
content_type = mime_types.get(file_extension.lower(), 'image/png')
print(f"worker-comfyui - ContentType set to: {content_type} (from extension: {file_extension})")
# Upload with ContentType specified (required for Supabase Storage)
# Use upload_file for large files - it automatically uses multipart upload for files > 8MB
# This handles files larger than the PutObject limit (typically 5GB, but some services have lower limits)
# Configure multipart threshold lower to ensure multipart is used for large files
from botocore.config import Config
from botocore.exceptions import ClientError
# Configure boto3 to use multipart upload for files larger than 5MB
# This ensures large files are uploaded in chunks
multipart_config = Config(
multipart_threshold=5 * 1024 * 1024, # 5MB threshold - use multipart for files > 5MB
max_multipart_size=64 * 1024 * 1024, # 64MB per part (max allowed by S3)
)
s3_client_multipart = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
config=multipart_config
)
# upload_file automatically handles multipart uploads for large files
# ExtraArgs allows us to set ContentType and other metadata
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024)
print(f"worker-comfyui - Uploading {filename} ({file_size_mb:.2f} MB) using multipart upload...")
s3_client_multipart.upload_file(
Filename=temp_file_path,
Bucket=bucket_name,
Key=s3_key,
ExtraArgs={
'ContentType': content_type
}
)
print(f"worker-comfyui - Successfully uploaded {filename} to S3")
# Construct the public URL using the same s3_key that was used for upload
# This ensures the response URL matches the actual uploaded file location
# Use BUCKET_PUBLIC_URL if configured, otherwise construct from endpoint URL
public_url = os.environ.get("BUCKET_PUBLIC_URL")
if public_url:
# Use configured public URL (e.g., https://pub-xxx.r2.dev or Supabase public URL)
# Ensure it doesn't end with a slash
public_url = public_url.rstrip("/")
# Include bucket name in the URL: publicendpoint/bucketname/path
s3_url = f"{public_url}/{bucket_name}/{s3_key}"
else:
# Fallback: construct from endpoint URL
parsed = urllib.parse.urlparse(endpoint_url)
hostname = parsed.hostname or ""
# Check if this is Supabase Storage
if 'storage.supabase.co' in hostname:
# Supabase public URL format: https://<project-ref>.supabase.co/storage/v1/object/public/<bucket-name>/<key>
# Extract project ref from hostname (e.g., cpelxqvcjnbpnphttzsn.storage.supabase.co -> cpelxqvcjnbpnphttzsn)
project_ref = hostname.split('.')[0]
s3_url = f"https://{project_ref}.supabase.co/storage/v1/object/public/{bucket_name}/{s3_key}"
elif 'r2.cloudflarestorage.com' in hostname:
# Cloudflare R2 public URL format: https://<account-id>.r2.cloudflarestorage.com/<bucket-name>/<key>
account_id = hostname.split('.')[0]
s3_url = f"https://{account_id}.r2.cloudflarestorage.com/{bucket_name}/{s3_key}"
else:
# Fallback: construct from endpoint URL
base_url = endpoint_url.rstrip("/")
s3_url = f"{base_url}/{bucket_name}/{s3_key}"
print(f"worker-comfyui - Uploaded {filename} to S3: {s3_url}")
os.remove(temp_file_path) # Clean up temp file
# Use custom path filename if provided, otherwise use original ComfyUI filename
response_filename = filename
if custom_path:
# Extract filename from custom path (e.g., /collections/test123/test_out.png -> test_out.png)
response_filename = os.path.basename(custom_path)
print(f"worker-comfyui - Using custom path filename in response: {response_filename}")
# Append dictionary with filename and URL
output_data.append(
{
"filename": response_filename,
"type": "s3_url",
"data": s3_url,
}
)
except Exception as e:
error_msg = f"Error uploading {filename} to S3: {e}"
print(f"worker-comfyui - {error_msg}")
error_msg += " Verify your S3 credentials (Access Key ID, Secret Access Key) and bucket name are correct."
errors.append(error_msg)
if "temp_file_path" in locals() and os.path.exists(
temp_file_path
):
try:
os.remove(temp_file_path)
except OSError as rm_err:
print(
f"worker-comfyui - Error removing temp file {temp_file_path}: {rm_err}"
)
else:
# Return as base64 string (small file, S3 not configured)
# Log size warning for approaching threshold
if image_size > REQUIRE_S3_SIZE_THRESHOLD * 0.5: # > 50% of threshold
print(
f"worker-comfyui - WARNING: File {filename} is moderately large ({image_size_mb:.2f} MB). "
f"Consider configuring S3 upload (BUCKET_ENDPOINT_URL, BUCKET_NAME) for better reliability with large files. "
f"See docs/s3_setup_guide.md for setup instructions."
)
try:
base64_image = base64.b64encode(image_bytes).decode("utf-8")
# Use custom path filename if provided, otherwise use original ComfyUI filename
response_filename = filename
if custom_path:
# Extract filename from custom path (e.g., /collections/test123/test_out.png -> test_out.png)
response_filename = os.path.basename(custom_path)
print(f"worker-comfyui - Using custom path filename in response: {response_filename}")
# Append dictionary with filename and base64 data
output_data.append(
{
"filename": response_filename,
"type": "base64",
"data": base64_image,
}
)
file_type_str = "video" if is_video else "image"
print(f"worker-comfyui - Encoded {response_filename} ({file_type_str}) as base64 ({image_size / 1024:.1f} KB)")
except Exception as e:
error_msg = f"Error encoding {filename} to base64: {e}"
print(f"worker-comfyui - {error_msg}")
errors.append(error_msg)
else:
error_msg = f"Failed to fetch image data for {filename} from /view endpoint."
errors.append(error_msg)
# Check for other output types
other_keys = [k for k in node_output.keys() if k != "images"]
if other_keys:
warn_msg = (
f"Node {node_id} produced unhandled output keys: {other_keys}."
)
print(f"worker-comfyui - WARNING: {warn_msg}")
print(
f"worker-comfyui - --> If this output is useful, please consider opening an issue on GitHub to discuss adding support."
)
except websocket.WebSocketException as e:
print(f"worker-comfyui - WebSocket Error: {e}")
print(traceback.format_exc())
return {"error": f"WebSocket communication error: {e}"}
except requests.RequestException as e:
print(f"worker-comfyui - HTTP Request Error: {e}")
print(traceback.format_exc())
return {"error": f"HTTP communication error with ComfyUI: {e}"}
except ValueError as e:
print(f"worker-comfyui - Value Error: {e}")
print(traceback.format_exc())
return {"error": str(e)}
except Exception as e:
print(f"worker-comfyui - Unexpected Handler Error: {e}")
print(traceback.format_exc())
return {"error": f"An unexpected error occurred: {e}"}
finally:
if ws and ws.connected:
print(f"worker-comfyui - Closing websocket connection.")
ws.close()
final_result = {}
if output_data:
final_result["images"] = output_data
if errors:
final_result["errors"] = errors
print(f"worker-comfyui - Job completed with errors/warnings: {errors}")
if not output_data and errors:
print(f"worker-comfyui - Job failed with no output images.")
# Check if any errors are 504-related and provide helpful suggestions
error_str = " ".join(errors).lower()
if "504" in error_str or "gateway timeout" in error_str or ("timeout" in error_str and "gemini" in error_str):
suggestion = (
"504 Gateway Timeout errors from Gemini API typically occur with large images (4K+). "
"Suggestions:\n"
"1. Increase timeouts for large images by setting environment variables:\n"
" - WEBSOCKET_RECV_TIMEOUT_S=600 (10 minutes for websocket receive)\n"
" - WORKFLOW_EXECUTION_TIMEOUT_S=1200 (20 minutes for workflow execution)\n"
"2. Reduce image resolution before sending to Gemini (e.g., resize to 2K or lower)\n"
"3. Compress images before sending (use prepare_request.py with --max-width and --max-height flags)\n"
"4. Reduce the number of images sent to Gemini in a single request\n"
"5. Use lower resolution settings in GeminiImage2Node (e.g., '2K' instead of '4K')\n"
"6. The handler will automatically retry 504 errors up to 3 times (configurable via GEMINI_504_RETRY_ATTEMPTS)"
)
return {
"error": "Job processing failed (504 Gateway Timeout from Gemini API)",
"details": errors,
"suggestion": suggestion
}
return {
"error": "Job processing failed",
"details": errors,
}
elif not output_data and not errors:
print(
f"worker-comfyui - Job completed successfully, but the workflow produced no images."
)
final_result["status"] = "success_no_images"
final_result["images"] = []
# Calculate and log response size before returning
response_size = calculate_response_size(final_result)
response_size_mb = response_size / (1024 * 1024)
print(f"worker-comfyui - Response size: {response_size:,} bytes ({response_size_mb:.2f} MB)")
# Check if response is too large
if response_size > MAX_RESPONSE_SIZE_BYTES:
size_limit_mb = MAX_RESPONSE_SIZE_BYTES / (1024 * 1024)
storage_help = (
"Configure S3-compatible storage (Cloudflare R2, Supabase Storage, AWS S3, etc.) by setting "
"BUCKET_ENDPOINT_URL, BUCKET_NAME, BUCKET_ACCESS_KEY_ID, and BUCKET_SECRET_ACCESS_KEY. "
"See docs/s3_setup_guide.md for instructions."
)
error_msg = (
f"Response payload too large ({response_size_mb:.2f} MB) exceeds limit ({size_limit_mb:.2f} MB). "
f"This typically happens when returning base64-encoded images. "
f"To fix this, {storage_help} "
f"Alternatively, reduce image size/quality in your workflow."
)
print(f"worker-comfyui - ERROR: {error_msg}")
return {
"error": "Response payload too large",
"details": error_msg,
"response_size_bytes": response_size,
"max_size_bytes": MAX_RESPONSE_SIZE_BYTES,
"suggestion": "Configure S3 upload (see docs/s3_setup_guide.md) or reduce image size/quality"
}
elif response_size > MAX_RESPONSE_SIZE_BYTES * 0.8:
# Warn if approaching the limit
print(f"worker-comfyui - WARNING: Response size ({response_size_mb:.2f} MB) is approaching the limit. Consider using S3 upload for large images.")
print(f"worker-comfyui - Job completed. Returning {len(output_data)} image(s).")
return final_result
if __name__ == "__main__":
print("worker-comfyui - Starting handler...")
runpod.serverless.start({"handler": handler})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment